This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6eb59677474 feat: Integrate the mdt compaction with existing flink 
compaction pipeline (#17991)
d6eb59677474 is described below

commit d6eb596774747e96a0ea970de2cac4bd82d34023
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Jan 30 15:16:43 2026 +0800

    feat: Integrate the mdt compaction with existing flink compaction pipeline 
(#17991)
    
    * feat: Integrate the mdt compaction with existing flink compaction pipeline
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../hudi/table/action/compact/CompactHelpers.java  |  15 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   8 +
 .../FlinkHoodieBackedTableMetadataWriter.java      |  15 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   2 +-
 .../src/main/java/org/apache/hudi/util/Lazy.java   |   3 +
 .../apache/hudi/configuration/FlinkOptions.java    |  21 ++
 .../apache/hudi/configuration/OptionsResolver.java |  21 +-
 .../java/org/apache/hudi/sink/CleanFunction.java   |  61 ++---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  26 ++-
 .../hudi/sink/clustering/ClusteringCommitSink.java |   2 +-
 .../apache/hudi/sink/compact/CompactOperator.java  | 133 +++--------
 .../hudi/sink/compact/CompactionCommitEvent.java   |  14 +-
 .../hudi/sink/compact/CompactionCommitSink.java    | 152 +++---------
 .../hudi/sink/compact/CompactionPlanEvent.java     |  17 +-
 .../hudi/sink/compact/CompactionPlanOperator.java  | 124 +++-------
 .../hudi/sink/compact/FlinkCompactionConfig.java   |   2 +-
 .../hudi/sink/compact/handler/CleanHandler.java    | 118 ++++++++++
 .../sink/compact/handler/CompactCommitHandler.java | 257 +++++++++++++++++++++
 .../hudi/sink/compact/handler/CompactHandler.java  | 201 ++++++++++++++++
 .../compact/handler/CompactionPlanHandler.java     | 220 ++++++++++++++++++
 .../handler/MetadataCompactCommitHandler.java      | 138 +++++++++++
 .../compact/handler/MetadataCompactHandler.java    | 114 +++++++++
 .../handler/MetadataCompactionPlanHandler.java     | 179 ++++++++++++++
 .../org/apache/hudi/sink/v2/utils/PipelinesV2.java |   2 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |   4 +-
 .../java/org/apache/hudi/util/CompactionUtil.java  |  76 ++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  15 ++
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |   4 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  47 +++-
 .../hudi/sink/utils/CompactFunctionWrapper.java    |   8 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   2 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |   9 +-
 .../apache/hudi/table/ITTestSchemaEvolution.java   |   2 +-
 .../org/apache/hudi/utils/TestCompactionUtil.java  | 128 +++++++++-
 .../test/java/org/apache/hudi/utils/TestUtils.java |  19 ++
 35 files changed, 1760 insertions(+), 399 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 9fd77a93e712..bc6e90fd460e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieCompactionException;
@@ -62,16 +63,24 @@ public class CompactHelpers<T, I, K, O> {
   public HoodieCommitMetadata createCompactionMetadata(
       HoodieTable table, String compactionInstantTime, HoodieData<WriteStatus> 
writeStatuses,
       String schema) throws IOException {
+    return createCompactionMetadata(table, compactionInstantTime, 
writeStatuses, schema, WriteOperationType.COMPACT);
+  }
+
+  public HoodieCommitMetadata createCompactionMetadata(
+      HoodieTable table, String compactionInstantTime, HoodieData<WriteStatus> 
writeStatuses,
+      String schema, WriteOperationType operationType) throws IOException {
     InstantGenerator instantGenerator = table.getInstantGenerator();
-    HoodieCompactionPlan compactionPlan = 
table.getActiveTimeline().readCompactionPlan(
-        instantGenerator.getCompactionRequestedInstant(compactionInstantTime));
+    HoodieInstant requestedInstant = operationType == 
WriteOperationType.COMPACT
+        ? instantGenerator.getCompactionRequestedInstant(compactionInstantTime)
+        : 
instantGenerator.getLogCompactionRequestedInstant(compactionInstantTime);
+    HoodieCompactionPlan compactionPlan = 
table.getActiveTimeline().readCompactionPlan(requestedInstant);
     List<HoodieWriteStat> updateStatusMap = 
writeStatuses.map(WriteStatus::getStat).collectAsList();
     HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
     for (HoodieWriteStat stat : updateStatusMap) {
       metadata.addWriteStat(stat.getPartitionPath(), stat);
     }
     
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY,
 schema);
-    metadata.setOperationType(WriteOperationType.COMPACT);
+    metadata.setOperationType(operationType);
     if (compactionPlan.getExtraMetadata() != null) {
       compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1acb5dfde3db..c0a92dafed7b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -57,6 +57,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -439,6 +440,13 @@ public class HoodieFlinkWriteClient<T>
     throw new HoodieNotSupportedException("Clustering is not supported yet");
   }
 
+  /**
+   * Commit log compaction and track metrics.
+   */
+  public void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime) {
+    tableServiceClient.completeLogCompaction(metadata, table, 
compactionCommitTime, Collections.emptyList());
+  }
+
   private void completeClustering(
       HoodieReplaceCommitMetadata metadata,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 58acec5705c1..268ae942b92e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -104,6 +104,14 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     }
   }
 
+  /**
+   * Return the write client for metadata table, which will be used for 
compaction scheduling in flink write coordinator.
+   */
+  @Override
+  public BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> 
getWriteClient() {
+    return super.getWriteClient();
+  }
+
   @Override
   protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) 
{
     // no op. HUDI-8801 to fix.
@@ -133,7 +141,12 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   @Override
   protected void commitInternal(String instantTime, Map<String, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
                                 Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
-    performTableServices(Option.ofNullable(instantTime), false);
+    // this method will be invoked during compaction of data table, here table 
services for mdt is
+    // not performed if streaming writes mdt is enabled, since the 
compaction/clean will be performed
+    // asynchronously in the dedicated compaction pipeline.
+    if (!dataWriteConfig.getMetadataConfig().isStreamingWriteEnabled()) {
+      performTableServices(Option.ofNullable(instantTime), false);
+    }
     metadataMetaClient.reloadActiveTimeline();
     super.commitInternal(instantTime, partitionRecordsMap, isInitializing, 
bulkInsertPartitioner);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index ca7b7a2fa585..fee038151f6e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -118,7 +118,7 @@ public abstract class HoodieFlinkTable<T>
     if (config.isMetadataTableEnabled() || 
getMetaClient().getTableConfig().isMetadataTableAvailable()) {
       return Option.of(FlinkHoodieBackedTableMetadataWriter.create(
           getContext().getStorageConf(), config, failedWritesCleaningPolicy, 
getContext(),
-          Option.of(triggeringInstantTimestamp), streamingWrites));
+          Option.ofNullable(triggeringInstantTimestamp), streamingWrites));
     } else {
       return Option.empty();
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java 
b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
index 2b135a0f383c..d0c9e80ae822 100644
--- a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
+++ b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import lombok.Getter;
+
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.function.Supplier;
@@ -30,6 +32,7 @@ import java.util.function.Supplier;
 @ThreadSafe
 public class Lazy<T> {
 
+  @Getter
   private volatile boolean initialized;
 
   private Supplier<T> initializer;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e4795950e51e..7ce83e737324 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -228,6 +228,20 @@ public class FlinkOptions extends HoodieConfig {
       .withFallbackKeys(HoodieMetadataConfig.ENABLE.key())
       .withDescription("Enable the internal metadata table which serves table 
metadata like level file listings, default enabled");
 
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> 
METADATA_COMPACTION_SCHEDULE_ENABLED = ConfigOptions
+      .key("metadata.compaction.schedule.enabled")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Schedule the compaction plan for metadata table, 
enabled by default.");
+
+  public static final ConfigOption<Boolean> METADATA_COMPACTION_ASYNC_ENABLED 
= ConfigOptions
+      .key("metadata.compaction.async.enabled")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether to enable async compaction for metadata table,"
+          + "if true, the compaction for metadata table will be performed in 
the compaction pipeline, default enabled.");
+
   public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS 
= ConfigOptions
       .key("metadata.compaction.delta_commits")
       .intType()
@@ -894,6 +908,13 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Parallelism of tasks that do actual compaction, 
default same as the write task parallelism");
 
+  @AdvancedConfig
+  public static final ConfigOption<Boolean> 
COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED = ConfigOptions
+      .key("compaction.operation.execute.async.enabled")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether the compaction operation should be executed 
asynchronously on compact operator, default enabled.");
+
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
   public static final String NUM_AND_TIME = "num_and_time";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index b0734d9525e2..edd626886628 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -264,8 +264,25 @@ public class OptionsResolver {
    * @param conf The flink configuration.
    */
   public static boolean needsAsyncCompaction(Configuration conf) {
-    return OptionsResolver.isMorTable(conf)
-        && conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+    return OptionsResolver.isMorTable(conf) && 
conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+  }
+
+  /**
+   * Returns whether there is need to schedule the async metadata compaction.
+   *
+   * @param conf The flink configuration.
+   */
+  public static boolean needsAsyncMetadataCompaction(Configuration conf) {
+    return isStreamingIndexWriteEnabled(conf) && 
conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
+  }
+
+  /**
+   * Returns whether there is need to schedule the compaction plan for the 
metadata table.
+   *
+   * @param conf The flink configuration.
+   */
+  public static boolean needsScheduleMdtCompaction(Configuration conf) {
+    return isStreamingIndexWriteEnabled(conf) && 
conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 9a2636383676..53452018baa5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -21,9 +21,12 @@ package org.apache.hudi.sink;
 import org.apache.hudi.adapter.AbstractRichFunctionAdapter;
 import org.apache.hudi.adapter.SinkFunctionAdapter;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.compact.handler.CleanHandler;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -47,9 +50,11 @@ public class CleanFunction<T> extends 
AbstractRichFunctionAdapter
 
   protected HoodieFlinkWriteClient writeClient;
 
-  private NonThrownExecutor executor;
+  // clean handler for data table
+  private transient Option<CleanHandler> cleanHandlerOpt;
 
-  protected volatile boolean isCleaning;
+  // clean handler for metadata table
+  private transient Option<CleanHandler> mdtCleanHandlerOpt;
 
   public CleanFunction(Configuration conf) {
     this.conf = conf;
@@ -59,44 +64,29 @@ public class CleanFunction<T> extends 
AbstractRichFunctionAdapter
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
-    this.executor = 
NonThrownExecutor.builder(log).waitForTasksFinish(true).build();
     if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
-      executor.execute(() -> {
-        this.isCleaning = true;
-        try {
-          this.writeClient.clean();
-        } finally {
-          this.isCleaning = false;
-        }
-      }, "wait for cleaning finish");
+      this.cleanHandlerOpt = Option.of(new CleanHandler(writeClient));
+      this.mdtCleanHandlerOpt = 
OptionsResolver.isStreamingIndexWriteEnabled(conf)
+          ? Option.of(new 
CleanHandler(StreamerUtil.createMetadataWriteClient(writeClient))) : 
Option.empty();
+    } else {
+      this.cleanHandlerOpt = Option.empty();
+      this.mdtCleanHandlerOpt = Option.empty();
     }
+
+    cleanHandlerOpt.ifPresent(CleanHandler::clean);
+    mdtCleanHandlerOpt.ifPresent(CleanHandler::clean);
   }
 
   @Override
   public void notifyCheckpointComplete(long l) throws Exception {
-    if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
-      executor.execute(() -> {
-        try {
-          this.writeClient.waitForCleaningFinish();
-        } finally {
-          // ensure to switch the isCleaning flag
-          this.isCleaning = false;
-        }
-      }, "wait for cleaning finish");
-    }
+    cleanHandlerOpt.ifPresent(CleanHandler::waitForCleaningFinish);
+    mdtCleanHandlerOpt.ifPresent(CleanHandler::waitForCleaningFinish);
   }
 
   @Override
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
-    if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
-      try {
-        this.writeClient.startAsyncCleaning();
-        this.isCleaning = true;
-      } catch (Throwable throwable) {
-        // catch the exception to not affect the normal checkpointing
-        log.warn("Failed to start async cleaning", throwable);
-      }
-    }
+    cleanHandlerOpt.ifPresent(CleanHandler::startAsyncCleaning);
+    mdtCleanHandlerOpt.ifPresent(CleanHandler::startAsyncCleaning);
   }
 
   @Override
@@ -106,12 +96,7 @@ public class CleanFunction<T> extends 
AbstractRichFunctionAdapter
 
   @Override
   public void close() throws Exception {
-    if (executor != null) {
-      executor.close();
-    }
-
-    if (this.writeClient != null) {
-      this.writeClient.close();
-    }
+    cleanHandlerOpt.ifPresent(CleanHandler::close);
+    mdtCleanHandlerOpt.ifPresent(CleanHandler::close);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index cea0fda848ad..107496834d06 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -129,8 +129,6 @@ public class StreamWriteOperatorCoordinator
    */
   protected final Context context;
 
-  private final boolean isStreamingIndexWriteEnabled;
-
   /**
    * Gateways for sending events to sub-tasks.
    */
@@ -141,6 +139,11 @@ public class StreamWriteOperatorCoordinator
    */
   private transient HoodieFlinkWriteClient writeClient;
 
+  /**
+   * Write client for the metadata table.
+   */
+  private transient HoodieFlinkWriteClient metadataWriteClient;
+
   /**
    * Meta client.
    */
@@ -209,7 +212,6 @@ public class StreamWriteOperatorCoordinator
     this.context = context;
     this.parallelism = context.currentParallelism();
     this.storageConf = 
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHiveConf(conf));
-    this.isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
   }
 
   @Override
@@ -227,6 +229,11 @@ public class StreamWriteOperatorCoordinator
       this.writeClient = FlinkWriteClients.createWriteClient(conf);
       this.writeClient.tryUpgrade(instant, this.metaClient);
       initMetadataTable(this.writeClient);
+
+      if (tableState.scheduleMdtCompaction) {
+        this.metadataWriteClient = 
StreamerUtil.createMetadataWriteClient(writeClient);
+      }
+
       // start the executor
       this.executor = NonThrownExecutor.builder(log)
           .threadFactory(getThreadFactory("meta-event-handle"))
@@ -268,6 +275,9 @@ public class StreamWriteOperatorCoordinator
     if (writeClient != null) {
       writeClient.close();
     }
+    if (metadataWriteClient != null) {
+      metadataWriteClient.close();
+    }
     this.eventBuffers = null;
     if (this.clientIds != null) {
       this.clientIds.close();
@@ -458,6 +468,10 @@ public class StreamWriteOperatorCoordinator
     if (tableState.scheduleCompaction) {
       CompactionUtil.scheduleCompaction(writeClient, 
tableState.isDeltaTimeCompaction, committed);
     }
+    if (tableState.scheduleMdtCompaction) {
+      // schedule compaction for the metadata table
+      CompactionUtil.scheduleMetadataCompaction(metadataWriteClient, 
committed);
+    }
     // if clustering is on, schedule the clustering
     if (tableState.scheduleClustering) {
       ClusteringUtil.scheduleClustering(conf, writeClient, committed);
@@ -491,7 +505,7 @@ public class StreamWriteOperatorCoordinator
     this.instant = this.writeClient.startCommit(tableState.commitAction, 
this.metaClient);
     
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
 this.instant);
     // start commit for MDT if streaming writes to MDT is enabled
-    if (isStreamingIndexWriteEnabled) {
+    if (tableState.isStreamingIndexWriteEnabled) {
       this.writeClient.startCommitForMetadataTable(this.instant, 
this.writeClient.getHoodieTable());
     }
     this.writeClient.setWriteTimer(tableState.commitAction);
@@ -725,10 +739,12 @@ public class StreamWriteOperatorCoordinator
     final String commitAction;
     final boolean isOverwrite;
     final boolean scheduleCompaction;
+    final boolean scheduleMdtCompaction;
     final boolean scheduleClustering;
     final boolean syncHive;
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
+    final boolean isStreamingIndexWriteEnabled;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -737,9 +753,11 @@ public class StreamWriteOperatorCoordinator
       this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
       this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
       this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
+      this.scheduleMdtCompaction = 
OptionsResolver.needsScheduleMdtCompaction(conf);
       this.syncHive = conf.get(FlinkOptions.HIVE_SYNC_ENABLED);
       this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
+      this.isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 97ef564b8776..c4c1efc21c0a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -220,7 +220,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
 
     clusteringMetrics.updateCommitMetrics(instant, 
writeMetadata.getCommitMetadata().get());
     // whether to clean up the input base parquet files used for clustering
-    if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
+    if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
       log.info("Running inline clean");
       this.writeClient.clean();
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 21cd4f2fbfd5..60d117a373af 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -20,22 +20,14 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.adapter.MaskingOutputAdapter;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.handler.CompactHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactHandler;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.table.HoodieFlinkTable;
-import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
-import org.apache.hudi.table.format.FlinkRowDataReaderContext;
-import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -50,11 +42,6 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 import org.apache.flink.table.runtime.util.StreamRecordCollector;
-import org.apache.flink.util.Collector;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Supplier;
 
 /**
  * Operator to execute the actual compaction task assigned by the compaction 
plan task.
@@ -69,26 +56,6 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
    */
   private final Configuration conf;
 
-  /**
-   * Write Client.
-   */
-  private transient HoodieFlinkWriteClient<?> writeClient;
-
-  /**
-   * Hoodie Flink table.
-   */
-  private transient HoodieFlinkTable<?> flinkTable;
-
-  /**
-   * Whether to execute compaction asynchronously.
-   */
-  private final boolean asyncCompaction;
-
-  /**
-   * Id of current subtask.
-   */
-  private int taskID;
-
   /**
    * Executor service to execute the compaction task.
    */
@@ -109,19 +76,12 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
    */
   private transient String prevCompactInstant = "";
 
-  /**
-   * Whether FileGroup reader based compaction should be used;
-   */
-  private transient boolean useFileGroupReaderBasedCompaction;
+  private transient Lazy<CompactHandler> compactHandler;
 
-  /**
-   * InternalSchema manager used for handling schema evolution.
-   */
-  private transient InternalSchemaManager internalSchemaManager;
+  private transient Lazy<CompactHandler> mdtCompactHandler;
 
   public CompactOperator(Configuration conf) {
     this.conf = conf;
-    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
   }
 
   /**
@@ -144,13 +104,16 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
 
   @Override
   public void open() throws Exception {
-    this.taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
-    this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
-    this.flinkTable = this.writeClient.getHoodieTable();
-    if (this.asyncCompaction) {
+    // ID of current subtask
+    int taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+    if (conf.get(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED)) {
+      // executes compaction asynchronously.
       this.executor = NonThrownExecutor.builder(log).build();
     }
     this.collector = new StreamRecordCollector<>(output);
+    this.compactHandler = Lazy.lazily(() -> new CompactHandler(writeClient, 
taskID));
+    this.mdtCompactHandler = Lazy.lazily(() -> new 
MetadataCompactHandler(StreamerUtil.createMetadataWriteClient(writeClient), 
taskID));
     registerMetrics();
   }
 
@@ -158,61 +121,14 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
   public void processElement(StreamRecord<CompactionPlanEvent> record) throws 
Exception {
     final CompactionPlanEvent event = record.getValue();
     final String instantTime = event.getCompactionInstantTime();
-    final CompactionOperation compactionOperation = event.getOperation();
     boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
     prevCompactInstant = instantTime;
-    if (asyncCompaction) {
-      // executes the compaction task asynchronously to not block the 
checkpoint barrier propagate.
-      executor.execute(
-          () -> doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig(), needReloadMetaClient),
-          (errMsg, t) -> collector.collect(new 
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
-          "Execute compaction for instant %s from task %d", instantTime, 
taskID);
-    } else {
-      // executes the compaction task synchronously for batch mode.
-      log.info("Execute compaction for instant {} from task {}", instantTime, 
taskID);
-      doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig(), needReloadMetaClient);
-    }
-  }
 
-  private void doCompaction(String instantTime,
-                            CompactionOperation compactionOperation,
-                            Collector<CompactionCommitEvent> collector,
-                            HoodieWriteConfig writeConfig,
-                            boolean needReloadMetaClient) throws Exception {
-    compactionMetrics.startCompaction();
-    HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
-    HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
-    if (needReloadMetaClient) {
-      // reload the timeline
-      metaClient.reload();
+    if (event.isMetadataTable()) {
+      mdtCompactHandler.get().compact(executor, event, collector, 
needReloadMetaClient, compactionMetrics);
+    } else {
+      compactHandler.get().compact(executor, event, collector, 
needReloadMetaClient, compactionMetrics);
     }
-    // schema evolution
-    CompactionUtil.setAvroSchema(writeConfig, metaClient);
-    List<WriteStatus> writeStatuses = compactor.compact(
-        writeConfig,
-        compactionOperation,
-        instantTime,
-        flinkTable.getTaskContextSupplier(),
-        createReaderContext(writeClient, needReloadMetaClient),
-        flinkTable);
-    compactionMetrics.endCompaction();
-    collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
-  }
-
-  private HoodieReaderContext<?> createReaderContext(HoodieFlinkWriteClient<?> 
writeClient, boolean needReloadMetaClient) {
-    HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
-    // CAUTION: InternalSchemaManager will scan timeline, reusing the meta 
client so that the timeline is updated.
-    // Instantiate internalSchemaManager lazily here since it may not be 
needed for FG reader, e.g., schema evolution
-    // for log files in FG reader do not use internalSchemaManager.
-    Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
-      if (internalSchemaManager == null || needReloadMetaClient) {
-        internalSchemaManager = 
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
-      }
-      return internalSchemaManager;
-    };
-    // initialize storage conf lazily.
-    StorageConfiguration<?> readerConf = 
writeClient.getEngineContext().getStorageConf();
-    return new FlinkRowDataReaderContext(readerConf, 
internalSchemaManagerSupplier, Collections.emptyList(), 
metaClient.getTableConfig(), Option.empty());
   }
 
   @VisibleForTesting
@@ -220,14 +136,21 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
     this.executor = executor;
   }
 
+  @VisibleForTesting
+  public void setOutput(Output<StreamRecord<CompactionCommitEvent>> output) {
+    this.collector = new StreamRecordCollector<>(output);
+  }
+
   @Override
   public void close() throws Exception {
     if (null != this.executor) {
       this.executor.close();
     }
-    if (null != this.writeClient) {
-      this.writeClient.close();
-      this.writeClient = null;
+    if (compactHandler.isInitialized()) {
+      compactHandler.get().close();
+    }
+    if (mdtCompactHandler.isInitialized()) {
+      mdtCompactHandler.get().close();
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
index b03448294bb8..0eae1fe37397 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
@@ -57,11 +57,21 @@ public class CompactionCommitEvent implements Serializable {
    */
   private int taskID;
 
+  /**
+   * Whether the event is for metadata table.
+   */
+  private boolean isMetadataTable;
+
+  /**
+   * Whether the event is for log compaction.
+   */
+  private boolean isLogCompaction;
+
   /**
    * An event with NULL write statuses that represents a failed compaction.
    */
-  public CompactionCommitEvent(String instant, String fileId, int taskID) {
-    this(instant, fileId, null, taskID);
+  public CompactionCommitEvent(String instant, String fileId, int taskID, 
boolean isMetadataTable, boolean isLogCompaction) {
+    this(instant, fileId, null, taskID, isMetadataTable, isLogCompaction);
   }
 
   public boolean isFailed() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index b6dbe8bfa11b..d704b21b9601 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -19,30 +19,20 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.FlinkCompactionMetrics;
 import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.sink.compact.handler.CompactCommitHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactCommitHandler;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 /**
  * Function to check and commit the compaction action.
  *
@@ -62,25 +52,9 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
    */
   private final Configuration conf;
 
-  /**
-   * Buffer to collect the event from each compact task {@code 
CompactFunction}.
-   *
-   * <p>Stores the mapping of instant_time -> file_id -> event. Use a map to 
collect the
-   * events because the rolling back of intermediate compaction tasks 
generates corrupt
-   * events.
-   */
-  private transient Map<String, Map<String, CompactionCommitEvent>> 
commitBuffer;
+  private transient Lazy<CompactCommitHandler> compactCommitHandler;
 
-  /**
-   * Cache to store compaction plan for each instant.
-   * Stores the mapping of instant_time -> compactionPlan.
-   */
-  private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
-
-  /**
-   * The hoodie table.
-   */
-  private transient HoodieFlinkTable<?> table;
+  private transient Lazy<CompactCommitHandler> mdtCompactCommitHandler;
 
   /**
    * Compaction metrics.
@@ -95,12 +69,9 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   @Override
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
-    if (writeClient == null) {
-      this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
-    }
-    this.commitBuffer = new HashMap<>();
-    this.compactionPlanCache = new HashMap<>();
-    this.table = this.writeClient.getHoodieTable();
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+    this.compactCommitHandler = Lazy.lazily(() -> new 
CompactCommitHandler(conf, writeClient));
+    this.mdtCompactCommitHandler = Lazy.lazily(() -> new 
MetadataCompactCommitHandler(conf, 
StreamerUtil.createMetadataWriteClient(writeClient)));
     registerMetrics();
   }
 
@@ -114,102 +85,31 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
               + " is failed: {}, error record count: {}",
           instant, event.getTaskID(), event.isFailed(), 
getNumErrorRecords(event));
     }
-    commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
-        .put(event.getFileId(), event);
-    commitIfNecessary(instant, commitBuffer.get(instant).values());
-  }
-
-  private long getNumErrorRecords(CompactionCommitEvent event) {
-    if (event.getWriteStatuses() == null) {
-      return -1L;
+    if (event.isMetadataTable()) {
+      mdtCompactCommitHandler.get().commitIfNecessary(event, 
compactionMetrics);
+    } else {
+      compactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
     }
-    return event.getWriteStatuses().stream()
-        .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
   }
 
-  /**
-   * Condition to commit: the commit buffer has equal size with the compaction 
plan operations
-   * and all the compact commit event {@link CompactionCommitEvent} has the 
same compaction instant time.
-   *
-   * @param instant Compaction commit instant time
-   * @param events  Commit events ever received for the instant
-   */
-  private void commitIfNecessary(String instant, 
Collection<CompactionCommitEvent> events) throws IOException {
-    HoodieCompactionPlan compactionPlan = 
compactionPlanCache.computeIfAbsent(instant, k -> {
-      try {
-        return CompactionUtils.getCompactionPlan(
-            this.writeClient.getHoodieTable().getMetaClient(), instant);
-      } catch (Exception e) {
-        throw new HoodieException(e);
-      }
-    });
-
-    boolean isReady = compactionPlan.getOperations().size() == events.size();
-    if (!isReady) {
-      return;
-    }
-
-    if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
-      try {
-        // handle failure case
-        CompactionUtil.rollbackCompaction(table, instant, 
writeClient.getTransactionManager());
-      } finally {
-        // remove commitBuffer to avoid obsolete metadata commit
-        reset(instant);
-        this.compactionMetrics.markCompactionRolledBack();
-      }
-      return;
+  @Override
+  public void close() throws Exception {
+    if (compactCommitHandler.isInitialized()) {
+      compactCommitHandler.get().close();
     }
+    if (mdtCompactCommitHandler.isInitialized()) {
+      mdtCompactCommitHandler.get().close();
 
-    try {
-      doCommit(instant, events);
-    } catch (Throwable throwable) {
-      // make it fail-safe
-      log.error("Error while committing compaction instant: " + instant, 
throwable);
-      this.compactionMetrics.markCompactionRolledBack();
-    } finally {
-      // reset the status
-      reset(instant);
     }
+    super.close();
   }
 
-  @SuppressWarnings("unchecked")
-  private void doCommit(String instant, Collection<CompactionCommitEvent> 
events) throws IOException {
-    List<WriteStatus> statuses = events.stream()
-        .map(CompactionCommitEvent::getWriteStatuses)
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
-
-    long numErrorRecords = 
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
-
-    if (numErrorRecords > 0 && !this.conf.get(FlinkOptions.IGNORE_FAILED)) {
-      // handle failure case
-      log.error("Got {} error records during compaction of instant {},\n"
-          + "option '{}' is configured as false,"
-          + "rolls back the compaction", numErrorRecords, instant, 
FlinkOptions.IGNORE_FAILED.key());
-      CompactionUtil.rollbackCompaction(table, instant, 
writeClient.getTransactionManager());
-      this.compactionMetrics.markCompactionRolledBack();
-      return;
-    }
-
-    HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(
-        table, instant, HoodieListData.eager(statuses), 
writeClient.getConfig().getSchema());
-
-    // commit the compaction
-    this.writeClient.completeCompaction(metadata, table, instant);
-
-    this.compactionMetrics.updateCommitMetrics(instant, metadata);
-    this.compactionMetrics.markCompactionCompleted();
-
-    // Whether to clean up the old log file when compaction
-    if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
-      this.writeClient.clean();
+  private long getNumErrorRecords(CompactionCommitEvent event) {
+    if (event.getWriteStatuses() == null) {
+      return -1L;
     }
-  }
-
-  private void reset(String instant) {
-    this.commitBuffer.remove(instant);
-    this.compactionPlanCache.remove(instant);
+    return event.getWriteStatuses().stream()
+        .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
   }
 
   private void registerMetrics() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
index 8e8228541c77..19ec2a3d0729 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.common.model.CompactionOperation;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
@@ -30,7 +29,6 @@ import java.io.Serializable;
 /**
  * Represents a compact command from the compaction plan task {@link 
CompactionPlanOperator}.
  */
-@AllArgsConstructor
 @NoArgsConstructor
 @Getter
 @Setter
@@ -43,8 +41,23 @@ public class CompactionPlanEvent implements Serializable {
 
   private int index;
 
+  private boolean isMetadataTable;
+
+  private boolean isLogCompaction;
+
   public CompactionPlanEvent(String instantTime, CompactionOperation 
operation) {
+    this(instantTime, operation, 0);
+  }
+
+  public CompactionPlanEvent(String instantTime, CompactionOperation 
operation, int index) {
+    this(instantTime, operation, index, false, false);
+  }
+
+  public CompactionPlanEvent(String instantTime, CompactionOperation 
operation, int index, boolean isMetadataTable, boolean isLogCompaction) {
     this.compactionInstantTime = instantTime;
     this.operation = operation;
+    this.index = index;
+    this.isMetadataTable = isMetadataTable;
+    this.isLogCompaction = isLogCompaction;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 1ef6278fab6d..3fcc8dcfa61e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -18,19 +18,14 @@
 
 package org.apache.hudi.sink.compact;
 
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.metrics.FlinkCompactionMetrics;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.sink.compact.handler.CompactionPlanHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactionPlanHandler;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -46,13 +41,6 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.table.data.RowData;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.stream.Collectors.toList;
-
 /**
  * Operator that generates the compaction plan with pluggable strategies on 
finished checkpoints.
  *
@@ -67,15 +55,11 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
    */
   private final Configuration conf;
 
-  /**
-   * Meta Client.
-   */
-  @SuppressWarnings("rawtypes")
-  private transient HoodieFlinkTable table;
-
   private transient FlinkCompactionMetrics compactionMetrics;
 
-  private transient HoodieFlinkWriteClient writeClient;
+  private transient Option<CompactionPlanHandler> compactionPlanHandler;
+
+  private transient Option<CompactionPlanHandler> mdtCompactionPlanHandler;
 
   public CompactionPlanOperator(Configuration conf) {
     this.conf = conf;
@@ -85,12 +69,17 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   public void open() throws Exception {
     super.open();
     registerMetrics();
-    this.table = FlinkTables.createTable(conf, getRuntimeContext());
-    this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+    this.compactionPlanHandler = OptionsResolver.needsAsyncCompaction(conf)
+        ? Option.of(new CompactionPlanHandler(writeClient)) : Option.empty();
+    this.mdtCompactionPlanHandler = 
OptionsResolver.needsAsyncMetadataCompaction(conf)
+        ? Option.of(new 
MetadataCompactionPlanHandler(StreamerUtil.createMetadataWriteClient(writeClient)))
 : Option.empty();
+
     // when starting up, rolls back all the inflight compaction instants if 
there exists,
     // these instants are in priority for scheduling task because the 
compaction instants are
     // scheduled from earliest(FIFO sequence).
-    CompactionUtil.rollbackCompaction(table, this.writeClient);
+    
this.compactionPlanHandler.ifPresent(CompactionPlanHandler::rollbackCompaction);
+    
this.mdtCompactionPlanHandler.ifPresent(CompactionPlanHandler::rollbackCompaction);
   }
 
   /**
@@ -118,76 +107,12 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
 
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
-    try {
-      table.getMetaClient().reloadActiveTimeline();
-      // There is no good way to infer when the compaction task for an instant 
crushed
-      // or is still undergoing. So we use a configured timeout threshold to 
control the rollback:
-      // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
-      // when the earliest inflight instant has timed out, assumes it has 
failed
-      // already and just rolls it back.
-
-      // comment out: do we really need the timeout rollback ?
-      // CompactionUtil.rollbackEarliestCompaction(table, conf);
-      scheduleCompaction(table, checkpointId);
-    } catch (Throwable throwable) {
-      // make it fail-safe
-      log.error("Error while scheduling compaction plan for checkpoint: " + 
checkpointId, throwable);
-    }
-  }
-
-  private void scheduleCompaction(HoodieFlinkTable<?> table, long 
checkpointId) throws IOException {
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-
-    // the first instant takes the highest priority.
-    Option<HoodieInstant> firstRequested = pendingCompactionTimeline
-        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).firstInstant();
-    // record metrics
-    compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
-    
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
-
-    if (!firstRequested.isPresent()) {
-      // do nothing.
-      log.info("No compaction plan for checkpoint " + checkpointId);
-      return;
-    }
-
-    String compactionInstantTime = firstRequested.get().requestedTime();
-
-    // generate compaction plan
-    // should support configurable commit metadata
-    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
-        table.getMetaClient(), compactionInstantTime);
-
-    if (compactionPlan == null || (compactionPlan.getOperations() == null)
-        || (compactionPlan.getOperations().isEmpty())) {
-      // do nothing.
-      log.info("Empty compaction plan for instant " + compactionInstantTime);
-    } else {
-      HoodieInstant instant = 
table.getInstantGenerator().getCompactionRequestedInstant(compactionInstantTime);
-      // Mark instant as compaction inflight
-      
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
-      table.getMetaClient().reloadActiveTimeline();
-
-      List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
-          
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
-      log.info("Execute compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
-      WriteMarkersFactory
-          .get(table.getConfig().getMarkersType(), table, 
compactionInstantTime)
-          .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());
-      Map<String, Integer> fileIdIndexMap = new HashMap<>();
-      int index = 0;
-      for (CompactionOperation operation : operations) {
-        int operationIndex;
-        if (fileIdIndexMap.containsKey(operation.getFileId())) {
-          operationIndex = fileIdIndexMap.get(operation.getFileId());
-        } else {
-          operationIndex = index;
-          fileIdIndexMap.put(operation.getFileId(), operationIndex);
-          index++;
-        }
-        output.collect(new StreamRecord<>(new 
CompactionPlanEvent(compactionInstantTime, operation, operationIndex)));
-      }
-    }
+    // comment out: do we really need the timeout rollback ?
+    // CompactionUtil.rollbackEarliestCompaction(table, conf);
+    // schedule data table compaction if enabled
+    this.compactionPlanHandler.ifPresent(handler -> 
handler.collectCompactionOperations(checkpointId, compactionMetrics, output));
+    // Also schedule metadata table compaction if enabled
+    this.mdtCompactionPlanHandler.ifPresent(handler -> 
handler.collectCompactionOperations(checkpointId, compactionMetrics, output));
   }
 
   @VisibleForTesting
@@ -195,6 +120,13 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
     this.output = output;
   }
 
+  @Override
+  public void close() throws Exception {
+    this.compactionPlanHandler.ifPresent(CompactionPlanHandler::close);
+    this.mdtCompactionPlanHandler.ifPresent(CompactionPlanHandler::close);
+    super.close();
+  }
+
   @Override
   public void endInput() throws Exception {
     // Called when the input data ends, only used in batch mode.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 908b48e31d14..ca547c50e195 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -213,7 +213,7 @@ public class FlinkCompactionConfig extends Configuration {
     conf.set(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
     conf.set(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
     // use synchronous compaction always
-    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
     conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);
     // Map memory
     conf.setString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
config.spillableMapPath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
new file mode 100644
index 000000000000..c46b18bc5dac
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+
+/**
+ * Handler for managing table cleaning operations in clean operator.
+ *
+ * <p>This class provides mechanisms to execute cleaning operations 
asynchronously,
+ * ensuring proper coordination with Flink's checkpointing mechanism.
+ *
+ * <p>The handler uses a {@link NonThrownExecutor} to manage cleaning tasks 
and tracks
+ * the cleaning state to prevent concurrent cleaning operations.
+ */
+@Slf4j
+public class CleanHandler implements Closeable {
+  private final HoodieFlinkWriteClient writeClient;
+  private volatile boolean isCleaning;
+  private final NonThrownExecutor executor;
+
+  public CleanHandler(HoodieFlinkWriteClient writeClient) {
+    this.writeClient = writeClient;
+    this.executor = 
NonThrownExecutor.builder(log).waitForTasksFinish(true).build();
+  }
+
+  /**
+   * Executes a synchronous cleaning operation.
+   *
+   * <p>The actual cleaning is performed by the underlying write client, which 
removes
+   * old file versions based on the configured retention policy.
+   */
+  public void clean() {
+    executor.execute(() -> {
+      this.isCleaning = true;
+      try {
+        this.writeClient.clean();
+      } finally {
+        this.isCleaning = false;
+      }
+    }, "wait for cleaning finish");
+  }
+
+  /**
+   * Waits for an ongoing cleaning operation to finish.
+   *
+   * <p>If no cleaning operation is in progress, this method returns 
immediately
+   * without performing any action.
+   */
+  public void waitForCleaningFinish() {
+    if (isCleaning) {
+      executor.execute(() -> {
+        try {
+          this.writeClient.waitForCleaningFinish();
+        } finally {
+          // ensure to switch the isCleaning flag
+          this.isCleaning = false;
+        }
+      }, "wait for cleaning finish");
+    }
+  }
+
+  /**
+   * Starts an asynchronous cleaning operation.
+   *
+   * <p>Any exceptions thrown during the start of async cleaning are caught 
and logged
+   * as warnings to prevent interference with normal Flink checkpointing 
operations.
+   * This ensures that cleaning failures do not disrupt the streaming job's 
progress.
+   */
+  public void startAsyncCleaning() {
+    if (!isCleaning) {
+      try {
+        this.writeClient.startAsyncCleaning();
+        this.isCleaning = true;
+      } catch (Throwable throwable) {
+        // catch the exception to not affect the normal checkpointing
+        log.warn("Failed to start async cleaning", throwable);
+      }
+    }
+  }
+
+  /**
+   * Closes the CleanHandler and releases associated resources.
+   */
+  @Override
+  public void close() {
+    if (executor != null) {
+      try {
+        executor.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to close executor of clean 
handler.", e);
+      }
+    }
+    this.writeClient.clean();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
new file mode 100644
index 000000000000..175e0c3329b0
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for committing compaction operations in compaction sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ *   <li>Buffers compaction commit events from multiple parallel tasks;</li>
+ *   <li>Determines whether all compaction operations for an instant are 
collected as complete;</li>
+ *   <li>Commits the compaction to the timeline;</li>
+ *   <li>Rolls back failed compactions;</li>
+ *   <li>Triggers cleaning operations after successful compaction.</li>
+ * </ul>
+ *
+ * <p>The handler uses a commit buffer to collect events from all parallel 
compaction tasks.
+ * Once all operations for a compaction instant are complete, it validates the 
results and
+ * either commits the compaction or rolls it back based on the success/failure 
status.
+ *
+ * <p>The commit condition is met when the commit buffer has the same number 
of events as
+ * the compaction plan operations, and all events share the same compaction 
instant time.
+ *
+ * @see CompactionCommitEvent
+ * @see HoodieCompactionPlan
+ */
+@Slf4j
+public class CompactCommitHandler implements Closeable {
+  protected final HoodieFlinkTable table;
+  protected final HoodieFlinkWriteClient writeClient;
+  protected final Configuration conf;
+  /**
+   * Buffer to collect the event from each compact task {@code 
CompactFunction}.
+   *
+   * <p>Stores the mapping of instant_time -> file_id -> event. Use a map to 
collect the
+   * events because the rolling back of intermediate compaction tasks 
generates corrupt
+   * events.
+   */
+  private transient Map<String, Map<String, CompactionCommitEvent>> 
commitBuffer;
+
+  /**
+   * Cache to store compaction plan for each instant.
+   * Stores the mapping of instant_time -> compactionPlan.
+   */
+  protected transient Map<String, HoodieCompactionPlan> compactionPlanCache;
+
+  public CompactCommitHandler(Configuration conf, HoodieFlinkWriteClient 
writeClient) {
+    this.conf = conf;
+    this.table = writeClient.getHoodieTable();
+    this.writeClient = writeClient;
+    this.commitBuffer = new HashMap<>();
+    this.compactionPlanCache = new HashMap<>();
+  }
+
+  /**
+   * Commits the compaction if all operations for the instant are complete.
+   *
+   * <p>Condition to commit: the commit buffer has equal size with the 
compaction plan operations
+   * and all the compact commit event {@link CompactionCommitEvent} has the 
same compaction instant time.
+   *
+   * @param event             The compaction commit event
+   * @param compactionMetrics Metrics collector for tracking compaction 
progress
+   */
+  public void commitIfNecessary(CompactionCommitEvent event, 
FlinkCompactionMetrics compactionMetrics) {
+    String instant = event.getInstant();
+    commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
+        .put(event.getFileId(), event);
+
+    boolean isLogCompaction = event.isLogCompaction();
+    HoodieCompactionPlan compactionPlan = getCompactionPlan(instant, 
isLogCompaction);
+    Collection<CompactionCommitEvent> events = 
commitBuffer.get(instant).values();
+
+    boolean isReady = compactionPlan.getOperations().size() == events.size();
+    if (!isReady) {
+      return;
+    }
+
+    if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
+      try {
+        // handle the failure case
+        rollbackCompaction(instant, isLogCompaction);
+      } finally {
+        // remove commitBuffer to avoid obsolete metadata commit
+        reset(instant);
+        compactionMetrics.markCompactionRolledBack();
+      }
+      return;
+    }
+
+    try {
+      doCommit(instant, isLogCompaction, events, compactionMetrics);
+    } catch (Throwable throwable) {
+      // make it fail-safe
+      log.error("Error while committing compaction instant: {}", instant, 
throwable);
+      compactionMetrics.markCompactionRolledBack();
+    } finally {
+      // reset the status
+      reset(instant);
+    }
+  }
+
+  /**
+   * Performs the actual commit operation for a compaction instant.
+   *
+   * <p>This method aggregates write statuses from all compaction events, 
checks for errors,
+   * and either completes the compaction or rolls it back based on the error 
count and
+   * configuration. If successful and cleaning is enabled, it triggers a 
cleaning operation.
+   *
+   * @param instant           The compaction instant time
+   * @param isLogCompaction   Whether the compaction is log compaction
+   * @param events            All compaction commit events for this instant
+   * @param compactionMetrics Metrics collector for tracking compaction 
progress
+   * @throws IOException      If an I/O error occurs during commit
+   */
+  @SuppressWarnings("unchecked")
+  private void doCommit(
+      String instant,
+      boolean isLogCompaction,
+      Collection<CompactionCommitEvent> events,
+      FlinkCompactionMetrics compactionMetrics) throws IOException {
+    List<WriteStatus> statuses = events.stream()
+        .map(CompactionCommitEvent::getWriteStatuses)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    long numErrorRecords = 
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+    if (numErrorRecords > 0 && !this.conf.get(FlinkOptions.IGNORE_FAILED)) {
+      // handle failure case
+      log.error("Got {} error records during compaction of instant {},\n"
+          + "option '{}' is configured as false,"
+          + "rolls back the compaction", numErrorRecords, instant, 
FlinkOptions.IGNORE_FAILED.key());
+      rollbackCompaction(instant, isLogCompaction);
+      compactionMetrics.markCompactionRolledBack();
+      return;
+    }
+
+    // complete the compaction
+    completeCompaction(instant, isLogCompaction, statuses, compactionMetrics);
+
+    // Whether to clean up the old log file when compaction
+    if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+      writeClient.clean();
+    }
+  }
+
+  /**
+   * Rolls back a failed compaction operation.
+   *
+   * @param instant         The compaction instant time
+   * @param isLogCompaction Whether the compaction is log compaction
+   */
+  protected void rollbackCompaction(String instant, boolean isLogCompaction) {
+    CompactionUtil.rollbackCompaction(table, instant, 
writeClient.getTransactionManager());
+  }
+
+  /**
+   * Completes a successful compaction operation by creating metadata and 
committing to the timeline.
+   *
+   * <p>This method creates compaction metadata from the write statuses, 
commits the compaction
+   * to the timeline, and updates compaction metrics.
+   *
+   * @param instant           The compaction instant time
+   * @param isLogCompaction   Whether the compaction is log compaction
+   * @param statuses          List of write statuses from all compaction 
operations
+   * @param compactionMetrics Metrics collector for tracking compaction 
progress
+   * @throws IOException      If an I/O error occurs during completion
+   */
+  protected void completeCompaction(String instant,
+                                    boolean isLogCompaction,
+                                    List<WriteStatus> statuses,
+                                    FlinkCompactionMetrics compactionMetrics) 
throws IOException {
+    HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(
+        table, instant, HoodieListData.eager(statuses), 
writeClient.getConfig().getSchema());
+    writeClient.completeCompaction(metadata, table, instant);
+    compactionMetrics.updateCommitMetrics(instant, metadata);
+    compactionMetrics.markCompactionCompleted();
+  }
+
+  /**
+   * Retrieves the compaction plan for a given instant.
+   *
+   * <p>This method uses a cache to avoid repeatedly reading the compaction 
plan from storage.
+   * If the plan is not in the cache, it reads it from the timeline and caches 
it.
+   *
+   * @param instant         The compaction instant time
+   * @param isLogCompaction Whether the compaction is log compaction
+   *
+   * @return The compaction plan for the instant
+   */
+  protected HoodieCompactionPlan getCompactionPlan(String instant, boolean 
isLogCompaction) {
+    return compactionPlanCache.computeIfAbsent(instant, k -> {
+      try {
+        return CompactionUtils.getCompactionPlan(
+            this.writeClient.getHoodieTable().getMetaClient(), instant);
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    });
+  }
+
+  /**
+   * Resets the internal state for a completed or failed compaction instant.
+   *
+   * @param instant The compaction instant time to reset
+   */
+  private void reset(String instant) {
+    this.commitBuffer.remove(instant);
+    this.compactionPlanCache.remove(instant);
+  }
+
+  @Override
+  public void close() {
+    writeClient.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
new file mode 100644
index 000000000000..e6bda07a47f8
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieFlinkTable;
+import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+import org.apache.hudi.table.format.FlinkRowDataReaderContext;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Handler for executing compaction operations in compaction sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ *   <li>Executes compaction tasks for individual file groups;</li>
+ *   <li>Handles schema evolution during compaction;</li>
+ *   <li>Collects compaction metadata and generates commit events {@code 
CompactionCommitEvent}.</li>
+ * </ul>
+ *
+ * <p>The handler supports both synchronous and asynchronous compaction 
execution.
+ * It uses {@link HoodieFlinkMergeOnReadTableCompactor} to perform the actual 
compaction
+ * operations.
+ *
+ * @see CompactionPlanEvent
+ * @see CompactionCommitEvent
+ * @see HoodieFlinkMergeOnReadTableCompactor
+ */
+@Slf4j
+public class CompactHandler implements Closeable {
+  protected final HoodieFlinkTable table;
+  protected final HoodieFlinkWriteClient writeClient;
+  protected final int taskID;
+  /**
+   * InternalSchema manager used for handling schema evolution.
+   */
+  private transient InternalSchemaManager internalSchemaManager;
+
+  public CompactHandler(HoodieFlinkWriteClient writeClient, int taskId) {
+    this.table = writeClient.getHoodieTable();
+    this.writeClient = writeClient;
+    this.taskID = taskId;
+  }
+
+  /**
+   * Executes a compaction operation for a single file group.
+   *
+   * <p>This method supports both asynchronous and synchronous execution. When 
an executor
+   * is provided, the compaction runs asynchronously with error handling. 
Otherwise, it
+   * runs synchronously.
+   *
+   * @param executor             The executor for asynchronous execution, or 
null for synchronous execution
+   * @param event                The compaction plan event containing 
operation details
+   * @param collector            The collector for emitting compaction commit 
events
+   * @param needReloadMetaClient Whether to reload the meta client before 
compaction
+   * @param compactionMetrics    Metrics collector for compaction operations
+   * @throws Exception           If compaction fails
+   */
+  public void compact(@Nullable NonThrownExecutor executor,
+                      CompactionPlanEvent event,
+                      Collector<CompactionCommitEvent> collector,
+                      boolean needReloadMetaClient,
+                      FlinkCompactionMetrics compactionMetrics) throws 
Exception {
+    String instantTime = event.getCompactionInstantTime();
+    if (executor != null) {
+      executor.execute(
+          () -> doCompaction(event, collector, needReloadMetaClient, 
compactionMetrics),
+          (errMsg, t) -> collector.collect(createFailedCommitEvent(event)),
+          "Execute compaction for instant %s from task %d", instantTime, 
taskID);
+    } else {
+      // executes the compaction task synchronously for batch mode.
+      log.info("Execute compaction for instant {} from task {}", instantTime, 
taskID);
+      doCompaction(event, collector, needReloadMetaClient, compactionMetrics);
+    }
+  }
+
+  /**
+   * Performs the actual compaction operation.
+   *
+   * @param event                The compaction plan event containing 
operation details
+   * @param collector            The collector for emitting compaction commit 
events
+   * @param needReloadMetaClient Whether to reload the meta client before 
compaction
+   * @param compactionMetrics    Metrics collector for compaction operations
+   * @throws Exception           If compaction fails
+   */
+  protected void doCompaction(CompactionPlanEvent event,
+                              Collector<CompactionCommitEvent> collector,
+                              boolean needReloadMetaClient,
+                              FlinkCompactionMetrics compactionMetrics) throws 
Exception {
+    compactionMetrics.startCompaction();
+    HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    if (needReloadMetaClient) {
+      // reload the timeline
+      metaClient.reload();
+    }
+    // schema evolution
+    CompactionUtil.setAvroSchema(writeClient.getConfig(), metaClient);
+    List<WriteStatus> writeStatuses = compactor.compact(
+        writeClient.getConfig(),
+        event.getOperation(),
+        event.getCompactionInstantTime(),
+        table.getTaskContextSupplier(),
+        createReaderContext(needReloadMetaClient),
+        table);
+    compactionMetrics.endCompaction();
+    collector.collect(createCommitEvent(event, writeStatuses));
+  }
+
+  /**
+   * Creates a compaction commit event for successful compaction.
+   *
+   * @param event         The compaction plan event
+   * @param writeStatuses The write statuses from the compaction operation
+   * @return a new compaction commit event with the write statuses
+   */
+  protected CompactionCommitEvent createCommitEvent(CompactionPlanEvent event, 
List<WriteStatus> writeStatuses) {
+    return new CompactionCommitEvent(event.getCompactionInstantTime(), 
event.getOperation().getFileId(), writeStatuses, taskID, 
event.isMetadataTable(), event.isLogCompaction());
+  }
+
+  /**
+   * Creates a compaction commit event for failed compaction.
+   *
+   * @param event the compaction plan event
+   * @return a new compaction commit event marked as failed
+   */
+  private CompactionCommitEvent createFailedCommitEvent(CompactionPlanEvent 
event) {
+    return new CompactionCommitEvent(event.getCompactionInstantTime(), 
event.getOperation().getFileId(), taskID, event.isMetadataTable(), 
event.isLogCompaction());
+  }
+
+  /**
+   * Creates a reader context for reading data during compaction.
+   *
+   * <p>The internal schema manager is instantiated lazily and reused across 
operations
+   * unless the meta client needs to be reloaded.
+   *
+   * @param needReloadMetaClient whether to reload the meta client and schema 
manager
+   * @return a new Flink row data reader context
+   */
+  protected HoodieReaderContext<?> createReaderContext(boolean 
needReloadMetaClient) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    // CAUTION: InternalSchemaManager will scan timeline, reusing the meta 
client so that the timeline is updated.
+    // Instantiate internalSchemaManager lazily here since it may not be 
needed for FG reader, e.g., schema evolution
+    // for log files in FG reader do not use internalSchemaManager.
+    Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+      if (internalSchemaManager == null || needReloadMetaClient) {
+        internalSchemaManager = 
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+      }
+      return internalSchemaManager;
+    };
+    // initialize storage conf lazily.
+    StorageConfiguration<?> readerConf = 
writeClient.getEngineContext().getStorageConf();
+    return new FlinkRowDataReaderContext(readerConf, 
internalSchemaManagerSupplier, Collections.emptyList(), 
metaClient.getTableConfig(), Option.empty());
+  }
+
+  /**
+   * Closes the handler and releases resources.
+   *
+   * <p>This method closes the write client and any associated resources.
+   */
+  @Override
+  public void close() {
+    this.writeClient.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
new file mode 100644
index 000000000000..04264a723f1a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Handler for scheduling and managing compaction plans in compaction 
sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ *   <li>Retrieves pending compaction plans from the timeline;</li>
+ *   <li>Transitions compaction instants from REQUESTED to INFLIGHT state;</li>
+ *   <li>Distributes compaction operations to downstream tasks;</li>
+ *   <li>Rolls back failed compactions.</li>
+ * </ul>
+ *
+ * <p>The handler reads the Hudi timeline to detect the target pending 
compaction
+ * instant and generates compaction plan events for each compaction plan 
operation.
+ *
+ * @see CompactionPlanEvent
+ * @see HoodieFlinkWriteClient
+ */
+@Slf4j
+public class CompactionPlanHandler implements Closeable {
+  protected final HoodieFlinkTable table;
+  protected final HoodieFlinkWriteClient writeClient;
+
+  /**
+   * Constructs a new CompactionPlanHandler.
+   *
+   * @param writeClient the Hudi Flink write client for table operations
+   */
+  public CompactionPlanHandler(HoodieFlinkWriteClient writeClient) {
+    this.table = writeClient.getHoodieTable();
+    this.writeClient = writeClient;
+  }
+
+  /**
+   * Retrieve the first pending compaction plan and distribute compaction 
operations to downstream tasks.
+   *
+   * @param checkpointId      The Flink checkpoint ID
+   * @param compactionMetrics Metrics collector for compaction operations
+   * @param output            The output collector for emitting compaction 
plan events
+   */
+  public void collectCompactionOperations(
+      long checkpointId,
+      FlinkCompactionMetrics compactionMetrics,
+      Output<StreamRecord<CompactionPlanEvent>> output) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    metaClient.reloadActiveTimeline();
+
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+    Option<Pair<String, HoodieCompactionPlan>> instantAndPlanOpt =
+        getCompactionPlan(metaClient, pendingCompactionTimeline, checkpointId, 
compactionMetrics, CompactionUtils::getCompactionPlan);
+    if (instantAndPlanOpt.isEmpty()) {
+      return;
+    }
+    doCollectCompactionOperations(instantAndPlanOpt.get().getLeft(), 
instantAndPlanOpt.get().getRight(), output);
+  }
+
+  /**
+   * Rolls back any pending compaction operations.
+   *
+   * <p>This method is typically called during failure recovery to clean up
+   * incomplete compaction attempts.
+   */
+  public void rollbackCompaction() {
+    CompactionUtil.rollbackCompaction(table, this.writeClient);
+  }
+
+  /**
+   * Retrieves the first pending compaction plan from the timeline.
+   *
+   * @param metaClient                The table meta client
+   * @param pendingCompactionTimeline The timeline containing pending 
compaction instants
+   * @param checkpointId              The Flink checkpoint ID
+   * @param compactionMetrics         Metrics collector for compaction 
operations
+   * @param planGenerator             Function to generate the compaction plan 
from meta client and instant
+   * @return an optional pair of instant time and compaction plan, empty if no 
valid plan exists
+   */
+  protected Option<Pair<String, HoodieCompactionPlan>> getCompactionPlan(
+      HoodieTableMetaClient metaClient,
+      HoodieTimeline pendingCompactionTimeline,
+      long checkpointId,
+      FlinkCompactionMetrics compactionMetrics,
+      BiFunction<HoodieTableMetaClient, String, HoodieCompactionPlan> 
planGenerator) {
+    // the first instant takes the highest priority.
+    Option<HoodieInstant> firstRequested = pendingCompactionTimeline
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).firstInstant();
+    // record metrics
+    compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
+    
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
+
+    if (firstRequested.isEmpty()) {
+      log.info("No compaction plan for checkpoint {}", checkpointId);
+      return Option.empty();
+    }
+
+    String compactionInstantTime = firstRequested.get().requestedTime();
+    // generate compaction plan
+    // should support configurable commit metadata
+    HoodieCompactionPlan compactionPlan = planGenerator.apply(metaClient, 
compactionInstantTime);
+    if (compactionPlan == null || (compactionPlan.getOperations() == null)
+        || (compactionPlan.getOperations().isEmpty())) {
+      log.info("Empty compaction plan for instant {}", compactionInstantTime);
+      return Option.empty();
+    }
+    return Option.of(Pair.of(compactionInstantTime, compactionPlan));
+  }
+
+  /**
+   * Collects and distributes compaction operations for a Hudi table.
+   *
+   * <p>This method:
+   * <ul>
+   *   <li>Transitions the compaction instant from REQUESTED to INFLIGHT;</li>
+   *   <li>Deletes any existing marker directories;</li>
+   *   <li>Creates a compaction plan event for each operation;</li>
+   *   <li>Assigns operation indices to ensure proper task distribution;</li>
+   * </ul>
+   *
+   * @param compactionInstantTime The instant time for this compaction
+   * @param compactionPlan        The compaction plan containing operations to 
execute
+   * @param output                The output collector for emitting compaction 
plan events
+   */
+  protected void doCollectCompactionOperations(
+      String compactionInstantTime,
+      HoodieCompactionPlan compactionPlan,
+      Output<StreamRecord<CompactionPlanEvent>> output) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    // Mark instant as compaction inflight
+    HoodieInstant instant = 
metaClient.getInstantGenerator().getCompactionRequestedInstant(compactionInstantTime);
+    
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+    metaClient.reloadActiveTimeline();
+
+    List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
+        
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+    log.info("Execute compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
+
+    WriteMarkersFactory
+        .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+        .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());
+
+    Map<String, Integer> fileIdIndexMap = new HashMap<>();
+    int index = 0;
+    for (CompactionOperation operation : operations) {
+      int operationIndex;
+      if (fileIdIndexMap.containsKey(operation.getFileId())) {
+        operationIndex = fileIdIndexMap.get(operation.getFileId());
+      } else {
+        operationIndex = index;
+        fileIdIndexMap.put(operation.getFileId(), operationIndex);
+        index++;
+      }
+      output.collect(new StreamRecord<>(createPlanEvent(compactionInstantTime, 
operation, operationIndex)));
+    }
+  }
+
+  /**
+   * Creates a compaction plan event for a single compaction operation.
+   *
+   * @param compactionInstantTime The instant time for this compaction
+   * @param operation             The compaction operation to execute
+   * @param operationIndex        The index of this operation for task 
distribution
+   * @return a new compaction plan event
+   */
+  protected CompactionPlanEvent createPlanEvent(String compactionInstantTime, 
CompactionOperation operation, int operationIndex) {
+    return new CompactionPlanEvent(compactionInstantTime, operation, 
operationIndex);
+  }
+
+  /**
+   * Closes the handler and releases resources.
+   */
+  @Override
+  public void close() {
+    this.writeClient.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
new file mode 100644
index 000000000000..87ff1d583f32
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.util.CompactionUtil;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handler for committing compaction metadata to metadata table timeline.
+ *
+ * <p>This handler extends {@link CompactCommitHandler} to support metadata 
table specific
+ * compaction commit actions, including:
+ * <ul>
+ *   <li>Retrieves compaction plans for both compaction and log 
compaction;</li>
+ *   <li>Commits compaction metadata;</li>
+ *   <li>Commits log compaction metadata;</li>
+ *   <li>Rolls back failed compactions;</li>
+ * </ul>
+ *
+ * <p>The handler distinguishes between compaction(full compaction) and log 
compaction(minor compaction) based on
+ * the {@link CompactionCommitEvent#isLogCompaction()} flag, and triggers 
compaction completion or rollback
+ * appropriately with the write client.
+ *
+ * @see CompactCommitHandler
+ * @see CompactionCommitEvent
+ */
+public class MetadataCompactCommitHandler extends CompactCommitHandler {
+
+  public MetadataCompactCommitHandler(Configuration conf, 
HoodieFlinkWriteClient writeClient) {
+    super(conf, writeClient);
+  }
+
+  /**
+   * Completes a compaction for metadata tables.
+   *
+   * <p>This method is overridden to support both compaction(full compaction)
+   * and log compaction(minor compaction) for metadata tables. It creates 
appropriate metadata based on the
+   * operation type and completes the compaction.
+   *
+   * @param instant           The compaction instant time
+   * @param isLogCompaction   Whether the compaction is log compaction
+   * @param statuses          List of write statuses from all compaction 
operations
+   * @param compactionMetrics Metrics collector for tracking compaction 
progress
+   */
+  @Override
+
+  protected void completeCompaction(String instant,
+                                    boolean isLogCompaction,
+                                    List<WriteStatus> statuses,
+                                    FlinkCompactionMetrics compactionMetrics) 
throws IOException {
+    WriteOperationType operationType = isLogCompaction ? 
WriteOperationType.LOG_COMPACT : WriteOperationType.COMPACT;
+    HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(
+        table, instant, HoodieListData.eager(statuses), 
writeClient.getConfig().getSchema(), operationType);
+
+    // commit the compaction
+    if (isLogCompaction) {
+      writeClient.completeLogCompaction(metadata, table, instant);
+    } else {
+      writeClient.completeCompaction(metadata, table, instant);
+    }
+
+    compactionMetrics.updateCommitMetrics(instant, metadata);
+    compactionMetrics.markCompactionCompleted();
+  }
+
+  /**
+   * Rolls back a failed compaction.
+   *
+   * <p>This method is overridden to support rolling back of both compaction 
and log compaction.
+   *
+   * @param instant         The compaction instant time
+   * @param isLogCompaction Whether the compaction is log compaction
+   */
+  @Override
+  protected void rollbackCompaction(String instant, boolean isLogCompaction) {
+    if (isLogCompaction) {
+      CompactionUtil.rollbackLogCompaction(table, instant, 
writeClient.getTransactionManager());
+    } else {
+      CompactionUtil.rollbackCompaction(table, instant, 
writeClient.getTransactionManager());
+    }
+  }
+
+  /**
+   * Retrieves the compaction plan for a metadata table compaction instant.
+   *
+   * <p>This method is overridden to support retrieving both
+   * compaction plans and log compaction plans based on the compaction type.
+   * It uses a cache to avoid repeatedly reading plans from storage.
+   *
+   * @param instant         The compaction instant time
+   * @param isLogCompaction Whether the compaction is log compaction
+   *
+   * @return The compaction plan for the instant
+   * @throws HoodieException If the compaction plan cannot be retrieved
+   */
+  @Override
+  protected HoodieCompactionPlan getCompactionPlan(String instant, boolean 
isLogCompaction) {
+    return compactionPlanCache.computeIfAbsent(instant, k -> {
+      try {
+        return isLogCompaction ? 
CompactionUtils.getLogCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(),
 instant)
+            : 
CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(),
 instant);
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    });
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
new file mode 100644
index 000000000000..4ae3ee0c9477
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.AvroReaderContextFactory;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * Handler for compaction operation execution on Hudi metadata tables.
+ *
+ * <p>This handler extends {@link CompactHandler} to support metadata table 
specific
+ * compaction operations, covering compaction and log compaction.
+ *
+ * <p>The handler uses {@link AvroReaderContextFactory} to create reader 
contexts
+ * for metadata table payloads, which differs from data table records(engine 
native).
+ *
+ * @see CompactHandler
+ * @see CompactionPlanEvent
+ * @see AvroReaderContextFactory
+ */
+public class MetadataCompactHandler extends CompactHandler {
+
+  public MetadataCompactHandler(HoodieFlinkWriteClient writeClient, int 
taskId) {
+    super(writeClient, taskId);
+  }
+
+  /**
+   * Creates a reader context for reading metadata table records.
+   *
+   * <p>This method is overridden to create an Avro-based reader context
+   * adapted for metadata table file reading, which has a custom payload class.
+   *
+   * @param needReloadMetaClient Whether the meta client needs to be reloaded
+   *
+   * @return A reader context configured for metadata table file reading
+   */
+  @Override
+  protected HoodieReaderContext<?> createReaderContext(boolean 
needReloadMetaClient) {
+    String payloadClass = 
ConfigUtils.getPayloadClass(writeClient.getConfig().getProps());
+    AvroReaderContextFactory readerContextFactory = new 
AvroReaderContextFactory(table.getMetaClient(), payloadClass, 
writeClient.getConfig().getProps());
+    return readerContextFactory.getContext();
+  }
+
+  /**
+   * Executes a compaction operation.
+   *
+   * <p>This method is overridden to support both compaction(full compaction)
+   * and log compaction(minor compaction) for metadata tables.
+   *
+   * @param event                The compaction plan event containing the 
operation details
+   * @param collector            Collector for emitting compaction commit 
events
+   * @param needReloadMetaClient Whether the meta client needs to be reloaded
+   * @param compactionMetrics    Metrics collector for tracking compaction 
progress
+   */
+  @Override
+  protected void doCompaction(CompactionPlanEvent event,
+                              Collector<CompactionCommitEvent> collector,
+                              boolean needReloadMetaClient,
+                              FlinkCompactionMetrics compactionMetrics) throws 
Exception {
+    if (!event.isLogCompaction()) {
+      super.doCompaction(event, collector, needReloadMetaClient, 
compactionMetrics);
+    } else {
+      compactionMetrics.startCompaction();
+      // Create a write client specifically for the metadata table
+      HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      if (needReloadMetaClient) {
+        // reload the timeline
+        metaClient.reload();
+      }
+      Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
+      List<WriteStatus> writeStatuses = compactor.logCompact(
+          writeClient.getConfig(),
+          event.getOperation(),
+          event.getCompactionInstantTime(),
+          instantRange,
+          table,
+          table.getTaskContextSupplier());
+      compactionMetrics.endCompaction();
+      collector.collect(createCommitEvent(event, writeStatuses));
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
new file mode 100644
index 000000000000..3a8ef982eea7
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Handler for scheduling compaction plans on Hudi metadata tables.
+ *
+ * <p>This handler extends {@link CompactionPlanHandler} to support metadata 
table specific
+ * compaction operations, including:
+ * <ul>
+ *   <li>Compaction;</li>
+ *   <li>Log compaction(if enabled);</li>
+ *   <li>Rollback of the compactions;</li>
+ * </ul>
+ *
+ * <p>The handler first attempts to collect compaction operations. If no 
compaction
+ * plan is scheduled and log compaction is enabled, it collects the log 
compaction operations instead.
+ * This ensures efficient file layout of metadata table storage.
+ *
+ * @see CompactionPlanHandler
+ * @see CompactionPlanEvent
+ */
+@Slf4j
+public class MetadataCompactionPlanHandler extends CompactionPlanHandler {
+
+  public MetadataCompactionPlanHandler(HoodieFlinkWriteClient writeClient) {
+    super(writeClient);
+  }
+
+  /**
+   * Collects compaction operations for metadata tables.
+   *
+   * <p>This method is overridden to support both compaction(full compaction)
+   * and log compaction(minor compaction) for metadata tables. It first 
attempts to collect compaction operations.
+   * If no compaction plan is scheduled and log compaction is enabled, it 
collects the log compaction operations instead.
+   *
+   * @param checkpointId      The Flink checkpoint ID triggering this 
scheduling
+   * @param compactionMetrics Metrics collector for tracking compaction 
progress
+   * @param output            Output stream for emitting compaction plan events
+   */
+  @Override
+  public void collectCompactionOperations(long checkpointId, 
FlinkCompactionMetrics compactionMetrics, 
Output<StreamRecord<CompactionPlanEvent>> output) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    metaClient.reloadActiveTimeline();
+    // retrieve compaction plan
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+    Option<Pair<String, HoodieCompactionPlan>> instantAndPlanOpt = 
getCompactionPlan(
+        metaClient, pendingCompactionTimeline, checkpointId, 
compactionMetrics, CompactionUtils::getCompactionPlan);
+    if (instantAndPlanOpt.isPresent()) {
+      doCollectCompactionOperations(instantAndPlanOpt.get().getLeft(), 
instantAndPlanOpt.get().getRight(), output);
+      return;
+    }
+    if (!writeClient.getConfig().isLogCompactionEnabled()) {
+      return;
+    }
+    // for log compaction
+    HoodieTimeline pendingLogCompactionTimeline = 
metaClient.getActiveTimeline().filterPendingLogCompactionTimeline();
+    instantAndPlanOpt = getCompactionPlan(
+        metaClient, pendingLogCompactionTimeline, checkpointId, 
compactionMetrics, CompactionUtils::getLogCompactionPlan);
+    if (instantAndPlanOpt.isPresent()) {
+      doCollectLogCompactions(instantAndPlanOpt.get().getLeft(), 
instantAndPlanOpt.get().getRight(), output);
+    }
+  }
+
+  /**
+   * Rolls back pending compaction operations for metadata tables.
+   *
+   * <p>This method is overridden to support rolling back both compaction and 
log compaction.
+   */
+  @Override
+  public void rollbackCompaction() {
+    super.rollbackCompaction();
+    if (writeClient.getConfig().isLogCompactionEnabled()) {
+      CompactionUtil.rollbackLogCompaction(table, writeClient);
+    }
+  }
+
+  /**
+   * Creates a compaction plan event for metadata table operations.
+   *
+   * <p>This method is overridden to create the metadata table compaction plan 
events.
+   *
+   * @param compactionInstantTime The instant time for the compaction
+   * @param operation             The compaction operation details
+   * @param operationIndex        The index of this operation in the 
compaction plan
+   *
+   * @return A compaction plan event configured for metadata table compaction
+   */
+  @Override
+  protected CompactionPlanEvent createPlanEvent(String compactionInstantTime, 
CompactionOperation operation, int operationIndex) {
+    return new CompactionPlanEvent(compactionInstantTime, operation, 
operationIndex, true, false);
+  }
+
+  /**
+   * Collects and emits log compaction plan events for metadata tables.
+   *
+   * <p>This method transitions the log compaction instant from requested to 
inflight,
+   * extracts compaction operations from the plan, deletes marker directories, 
and
+   * emits compaction plan events for each operation. Operations with the same 
file ID
+   * are assigned with the same operation index to ensure they are processed 
together.
+   *
+   * @param compactionInstantTime The instant time for the log compaction
+   * @param compactionPlan        The log compaction plan containing 
operations to execute
+   * @param output                Output stream for emitting compaction plan 
events
+   */
+  public void doCollectLogCompactions(
+      String compactionInstantTime,
+      HoodieCompactionPlan compactionPlan,
+      Output<StreamRecord<CompactionPlanEvent>> output) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    // Mark log compaction as inflight
+    HoodieInstant instant = 
metaClient.getInstantGenerator().getLogCompactionRequestedInstant(compactionInstantTime);
+    
metaClient.getActiveTimeline().transitionLogCompactionRequestedToInflight(instant);
+    metaClient.reloadActiveTimeline();
+
+    List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
+        
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+    log.info("Execute log compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
+
+    WriteMarkersFactory
+        .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+        .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());
+
+    Map<String, Integer> fileIdIndexMap = new HashMap<>();
+    int index = 0;
+    for (CompactionOperation operation : operations) {
+      int operationIndex;
+      if (fileIdIndexMap.containsKey(operation.getFileId())) {
+        operationIndex = fileIdIndexMap.get(operation.getFileId());
+      } else {
+        operationIndex = index;
+        fileIdIndexMap.put(operation.getFileId(), operationIndex);
+        index++;
+      }
+      output.collect(new StreamRecord<>(
+          new CompactionPlanEvent(compactionInstantTime, operation, 
operationIndex, true, true)));
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
index fb9852ef4a22..6e93c02bc137 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
@@ -135,7 +135,7 @@ public class PipelinesV2 {
     if (OptionsResolver.needsAsyncCompaction(conf)) {
       // use synchronous compaction for bounded source.
       if (isBounded) {
-        conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+        conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, 
false);
       }
       return compactV2(conf, pipeline);
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 46727a1540fa..f46e2e672222 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -125,10 +125,10 @@ public class HoodieTableSink implements
       final DataStream<HoodieFlinkInternalRow> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
       pipeline = Pipelines.hoodieStreamWrite(conf, rowType, 
hoodieRecordDataStream);
       // compaction
-      if (OptionsResolver.needsAsyncCompaction(conf)) {
+      if (OptionsResolver.needsAsyncCompaction(conf) || 
OptionsResolver.needsAsyncMetadataCompaction(conf)) {
         // use synchronous compaction for bounded source.
         if (context.isBounded()) {
-          conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+          conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, 
false);
         }
         return Pipelines.compact(conf, pipeline);
       } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index b2e5072cdbbb..8e5456b8f1a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -65,6 +65,55 @@ public class CompactionUtil {
     }
   }
 
+  /**
+   * Schedules a new compaction/log compaction for the metadata table.
+   *
+   * @param writeClient The metadata write client
+   * @param committed   Whether the triggering instant was committed 
successfully
+   */
+  public static void scheduleMetadataCompaction(
+      HoodieFlinkWriteClient<?> writeClient,
+      boolean committed) {
+    if (!committed) {
+      return;
+    }
+    if (canScheduleMetadataCompaction(writeClient.getConfig(), 
writeClient.getHoodieTable().getMetaClient())) {
+      Option<String> compactInstant = 
writeClient.scheduleCompaction(Option.empty());
+      if (compactInstant.isPresent()) {
+        log.info("Scheduled compaction {} for metadata table.", 
compactInstant.get());
+        return;
+      }
+      if (!writeClient.getConfig().isLogCompactionEnabled()) {
+        return;
+      }
+      Option<String> logCompactionInstant = 
writeClient.scheduleLogCompaction(Option.empty());
+      if (logCompactionInstant.isPresent()) {
+        log.info("Scheduled log compaction {} for metadata table.", 
logCompactionInstant.get());
+      }
+    }
+  }
+
+  /**
+   * Validates the timeline for both data and metadata tables to ensure 
compaction on MDT can be scheduled.
+   *
+   * @param metadataWriteConfig The write config for metadata write client
+   * @param metadataMetaClient The table metadata client for metadata table
+   *
+   * @return True if the compaction/log compaction can be scheduled for 
metadata table.
+   */
+  private static boolean canScheduleMetadataCompaction(HoodieWriteConfig 
metadataWriteConfig, HoodieTableMetaClient metadataMetaClient) {
+    // Under the log compaction scope, the sequence of the log-compaction and 
compaction needs to be ensured because metadata items such as RLI
+    // only has proc-time ordering semantics. For "ensured", it means the 
completion sequence of the log-compaction/compaction is the same as the start 
sequence.
+    if (metadataWriteConfig.isLogCompactionEnabled()) {
+      Option<HoodieInstant> pendingLogCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+      Option<HoodieInstant> pendingCompactionInstant =
+          
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+      return !pendingLogCompactionInstant.isPresent() && 
!pendingCompactionInstant.isPresent();
+    }
+    return true;
+  }
+
   /**
    * Sets up the avro schema string into the give configuration {@code conf}
    * through reading from the hoodie table metadata.
@@ -158,6 +207,33 @@ public class CompactionUtil {
     }
   }
 
+  public static void rollbackLogCompaction(HoodieFlinkTable<?> table, String 
instantTime, TransactionManager transactionManager) {
+    HoodieInstant inflightInstant = 
table.getInstantGenerator().getLogCompactionInflightInstant(instantTime);
+    if 
(table.getMetaClient().reloadActiveTimeline().filterPendingLogCompactionTimeline().containsInstant(inflightInstant))
 {
+      log.warn("Failed to rollback log compaction instant: [{}]", instantTime);
+      table.rollbackInflightLogCompaction(inflightInstant, transactionManager);
+    }
+  }
+
+  /**
+   * Force rolls back all the inflight log compaction instants, especially for 
job failover restart.
+   *
+   * @param table The hoodie table
+   */
+  public static void rollbackLogCompaction(HoodieFlinkTable<?> table, 
HoodieFlinkWriteClient writeClient) {
+    HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
+        .filterPendingLogCompactionTimeline()
+        .filter(instant ->
+            instant.getState() == HoodieInstant.State.INFLIGHT);
+    inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+      log.info("Rollback the inflight log compaction instant: {} for 
failover", inflightInstant);
+      table.rollbackInflightLogCompaction(inflightInstant, 
writeClient.getTransactionManager());
+    });
+    if (!inflightCompactionTimeline.getInstants().isEmpty()) {
+      table.getMetaClient().reloadActiveTimeline();
+    }
+  }
+
   /**
    * Force rolls back all the inflight compaction instants, especially for job 
failover restart.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 56e3c7646066..ecf7b73e312d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
@@ -62,6 +63,8 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
@@ -135,6 +138,18 @@ public class StreamerUtil {
     return properties;
   }
 
+  /**
+   * Creates the metadata write client from the given write client for data 
table.
+   */
+  public static HoodieFlinkWriteClient 
createMetadataWriteClient(HoodieFlinkWriteClient dataWriteClient) {
+    // Get the metadata writer from the table and use its write client
+    Option<HoodieTableMetadataWriter> metadataWriterOpt =
+        dataWriteClient.getHoodieTable().getMetadataWriter(null, true, true);
+    ValidationUtils.checkArgument(metadataWriterOpt.isPresent(), "Failed to 
create the metadata writer");
+    FlinkHoodieBackedTableMetadataWriter metadataWriter = 
(FlinkHoodieBackedTableMetadataWriter) metadataWriterOpt.get();
+    return (HoodieFlinkWriteClient) metadataWriter.getWriteClient();
+  }
+
   public static HoodieSchema 
getSourceSchema(org.apache.flink.configuration.Configuration conf) {
     if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
       return new FilebasedSchemaProvider(conf).getSourceHoodieSchema();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index d5bd4f895bc1..595cea5f7b94 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -169,7 +169,7 @@ public class ITTestDataStreamWrite extends TestLogger {
     conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
     conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     // use synchronized compaction to ensure flink job finishing with 
compaction completed.
-    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
     conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
 
     defaultWriteAndCheckExpected(conf, "mor_write_with_compact", 1);
@@ -212,7 +212,7 @@ public class ITTestDataStreamWrite extends TestLogger {
   public void testStreamWriteWithIndexBootstrap(String tableType) throws 
Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
     // use synchronized compaction to avoid sleeping for async compact.
-    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
     conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.set(FlinkOptions.TABLE_TYPE, tableType);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index eb25855dc1b2..60560b7cb3a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -55,7 +55,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -63,6 +65,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -857,12 +860,17 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
   }
 
-  @Test
-  public void testBucketAssignWithRLI() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testBucketAssignWithRLI(boolean mdtCompactionEnabled) throws 
Exception {
     // use record level index
     conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+    conf.set(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED, 
mdtCompactionEnabled);
+    if (mdtCompactionEnabled) {
+      conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
+    }
     TestHarness testHarness =
         preparePipeline(conf)
             .consume(TestData.DATA_SET_INSERT)
@@ -895,6 +903,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .checkpointComplete(2)
         .checkWrittenData(EXPECTED1)
         .end();
+
+    if (mdtCompactionEnabled) {
+      TestUtils.validateMdtCompactionInstant(conf.get(FlinkOptions.PATH), 
false);
+    }
   }
 
   @Test
@@ -938,12 +950,22 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
-  @Test
-  public void testIndexWriteFunctionWithMultipleCheckpoints() throws Exception 
{
+  @ParameterizedTest
+  @MethodSource("mdtCompactionParams")
+  public void testIndexWriteFunctionWithMultipleCheckpoints(boolean 
mdtCompactionEnabled, boolean isLogCompaction) throws Exception {
     // Test IndexWriteFunction with multiple checkpoint operations
     conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+    conf.set(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED, 
mdtCompactionEnabled);
+    if (mdtCompactionEnabled) {
+      if (isLogCompaction) {
+        
conf.setString(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
 "true");
+        
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "2");
+      } else {
+        conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
+      }
+    }
 
     TestHarness testHarness =
         preparePipeline()
@@ -985,6 +1007,23 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         getRecordKeyIndex(metaClient, Arrays.asList("id1", "id2", "id3", 
"id4", "id5", "id6", "id7", "id8", "id9", "id10", "id11"));
     assertEquals(11, result.size());
     result.forEach((key, value) -> 
assertEquals(expectedKeyPartitionMap.get(key), value.getPartitionPath()));
+
+    if (mdtCompactionEnabled) {
+      TestUtils.validateMdtCompactionInstant(conf.get(FlinkOptions.PATH), 
isLogCompaction);
+    }
+  }
+
+  /**
+   * Return test params => (mdt compaction enabled, log compaction enabled).
+   */
+  private static Stream<Arguments> mdtCompactionParams() {
+    Object[][] data =
+        new Object[][] {
+            {true, false},
+            {true, true},
+            {false, false},
+            {false, true}};
+    return Stream.of(data).map(Arguments::of);
   }
 
   private Map<String, HoodieRecordGlobalLocation> 
getRecordKeyIndex(HoodieTableMetaClient metaClient, List<String> keys) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 6e585f3386e9..7438773db6ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -88,7 +88,7 @@ public class CompactFunctionWrapper {
 
   public void openFunction() throws Exception {
     compactionPlanOperator = new CompactionPlanOperator(conf);
-    planEventOutput =  new CollectOutputAdapter<>();
+    planEventOutput = new CollectOutputAdapter<>();
     compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
     compactionPlanOperator.open();
 
@@ -109,7 +109,6 @@ public class CompactFunctionWrapper {
 
   public void compact(long checkpointID) throws Exception {
     // collect the CompactEvents.
-    compactionPlanOperator.setOutput(planEventOutput);
     compactionPlanOperator.notifyCheckpointComplete(checkpointID);
     // collect the CompactCommitEvents
     for (CompactionPlanEvent event : planEventOutput.getRecords()) {
@@ -119,6 +118,11 @@ public class CompactFunctionWrapper {
     for (CompactionCommitEvent event : commitEventOutput.getRecords()) {
       commitSink.invoke(event, null);
     }
+    // reset collector
+    planEventOutput = new CollectOutputAdapter<>();
+    compactionPlanOperator.setOutput(planEventOutput);
+    commitEventOutput = new CollectOutputAdapter<>();
+    compactOperator.setOutput(commitEventOutput);
   }
 
   public void close() throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 4a83dad542ff..5d0bd8cd6157 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -151,7 +151,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.stateInitializationContext = new MockStateInitializationContext();
     this.indexStateInitializationContext = new 
MockStateInitializationContext();
     this.coordinatorStateStore = new TreeMap<>();
-    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf) || 
OptionsResolver.needsAsyncMetadataCompaction(conf);
     this.isStreamingWriteIndexEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
     this.streamConfig = new StreamConfig(conf);
     streamConfig.setOperatorID(new OperatorID());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2fc626487057..6c1cc68705cc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -547,8 +547,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType) throws 
Exception {
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean 
mdtCompactionEnabled) throws Exception {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
@@ -557,10 +557,15 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.METADATA_ENABLED, true)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
         .option(FlinkOptions.TABLE_TYPE, tableType.name())
+        .option(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 
mdtCompactionEnabled ? 1 : 10)
         .end();
     tableEnv.executeSql(hoodieTableDDL);
     execInsertSql(tableEnv, TestSQL.INSERT_T1);
 
+    if (mdtCompactionEnabled) {
+      TestUtils.validateMdtCompactionInstant(tempFile.getAbsolutePath(), 
false);
+    }
+
     List<Row> result1 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1 where uuid = 
'id1'").execute().collect());
     assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, 
par1]]");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 6e1b5d372add..bcfe719e2015 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -467,7 +467,7 @@ public class ITTestSchemaEvolution {
 
   private void doCompact(Configuration conf) throws Exception {
     // use sync compaction to ensure compaction finished.
-    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
     try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
       HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 115eec7c88d0..c000fd20fb03 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -53,6 +54,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -63,6 +65,7 @@ public class TestCompactionUtil {
   private HoodieFlinkTable<?> table;
   private HoodieTableMetaClient metaClient;
   private Configuration conf;
+  private FlinkHoodieBackedTableMetadataWriter flinkMetadataWriter;
 
   @TempDir
   File tempFile;
@@ -82,8 +85,8 @@ public class TestCompactionUtil {
     this.metaClient = table.getMetaClient();
     // initialize the metadata table path
     if (conf.get(FlinkOptions.METADATA_ENABLED)) {
-      FlinkHoodieBackedTableMetadataWriter.create(table.getStorageConf(), 
table.getConfig(),
-          table.getContext(), Option.empty());
+      this.flinkMetadataWriter = (FlinkHoodieBackedTableMetadataWriter) 
FlinkHoodieBackedTableMetadataWriter.create(
+          table.getStorageConf(), table.getConfig(), table.getContext(), 
Option.empty());
     }
   }
 
@@ -182,5 +185,126 @@ public class TestCompactionUtil {
     metaClient.reloadActiveTimeline();
     return instantTime;
   }
+
+  @Test
+  void testScheduleMetadataCompactionSuccess() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+    options.put(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS.key(), "1");
+    beforeEach(options);
+
+    try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient) 
flinkMetadataWriter.getWriteClient()) {
+      HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+      HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+
+      // Initially, there should be no pending compactions
+      int initialCount = metadataMetaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstants().size();
+
+      // Call scheduleMetadataCompaction with committed = true when there are 
no pending compactions
+      CompactionUtil.scheduleMetadataCompaction(metadataWriteClient, true);
+
+      int finalCount = metadataMetaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstants().size();
+
+      // A new compaction should have been scheduled, so count should increase
+      assertThat(finalCount, is(initialCount + 1));
+    }
+  }
+
+  @Test
+  void testScheduleMetadataLogCompactionFallback() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+    options.put(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
+    
options.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), 
"true");
+    beforeEach(options);
+
+    try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient) 
flinkMetadataWriter.getWriteClient()) {
+      HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+      HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+      // Initially, check the timeline for log compaction instants
+      int initialLogCompactionCount = metadataMetaClient.reloadActiveTimeline()
+          .filterPendingLogCompactionTimeline()
+          .getInstants().size();
+
+      assertEquals(0, initialLogCompactionCount);
+
+      // Call scheduleMetadataCompaction with committed = true
+      CompactionUtil.scheduleMetadataCompaction(metadataWriteClient, true);
+
+      int finalLogCompactionCount = metadataMetaClient.reloadActiveTimeline()
+          .filterPendingLogCompactionTimeline()
+          .getInstants().size();
+
+      // If regular compaction fails, it might fall back to log compaction if 
enabled
+      // The exact behavior depends on the implementation, so we just ensure 
no exception occurs
+      // and that the method executes properly
+      assertEquals(1, finalLogCompactionCount);
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void rollbackLogCompactionWithWriteClient(boolean rollbackSingleInstant) 
throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+    options.put(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
+    
options.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), 
"true");
+    beforeEach(options);
+
+    try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient) 
flinkMetadataWriter.getWriteClient()) {
+      HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+      HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+
+      // Generate a log compaction plan and transition it to inflight
+      String instantTime = generateLogCompactionPlan(metadataMetaClient, 
metadataTable);
+
+      // Verify that the log compaction is in inflight state
+      List<HoodieInstant> inflightInstants = 
metadataMetaClient.getActiveTimeline()
+          .filterPendingLogCompactionTimeline()
+          .filter(instant -> instant.getState() == 
HoodieInstant.State.INFLIGHT)
+          .getInstants();
+      assertThat("There should be one inflight log compaction instant", 
inflightInstants.size(), is(1));
+      assertEquals(instantTime, inflightInstants.get(0).requestedTime());
+
+      // Call the rollbackLogCompaction method with writeClient
+      if (rollbackSingleInstant) {
+        CompactionUtil.rollbackLogCompaction(metadataTable, instantTime, 
metadataWriteClient.getTransactionManager());
+      } else {
+        CompactionUtil.rollbackLogCompaction(metadataTable, 
metadataWriteClient);
+      }
+
+      // Reload the timeline to check the state after rollback
+      metadataMetaClient.reloadActiveTimeline();
+
+      // Check that the log compaction instant is now in REQUESTED state 
(rolled back)
+      List<HoodieInstant> requestedInstants = 
metadataMetaClient.getActiveTimeline()
+          .filterPendingLogCompactionTimeline()
+          .getInstants();
+
+      assertThat("There should be no requested instant after rollback", 
requestedInstants.size(), is(0));
+    }
+  }
+
+  /**
+   * Generates a log compaction plan on the timeline and returns its instant 
time.
+   */
+  private String generateLogCompactionPlan(HoodieTableMetaClient metaClient, 
HoodieFlinkTable table) {
+    HoodieCompactionOperation operation = new HoodieCompactionOperation();
+    HoodieCompactionPlan plan = new 
HoodieCompactionPlan(Collections.singletonList(operation), 
Collections.emptyMap(), 1, null, null, null);
+    String instantTime = WriteClientTestUtils.createNewInstantTime();
+    HoodieInstant logCompactionInstant =
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
+    
metaClient.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
 plan);
+    
table.getActiveTimeline().transitionLogCompactionRequestedToInflight(logCompactionInstant);
+    metaClient.reloadActiveTimeline();
+    return instantTime;
+  }
 }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index adbee35dd7c6..f7e697b44f85 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
 
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,6 +31,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.storage.StoragePath;
@@ -48,6 +50,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -76,6 +80,21 @@ public class TestUtils {
         .orElse(null);
   }
 
+  public static void validateMdtCompactionInstant(String basePath, boolean 
isLogCompaction) throws IOException {
+    String baseMdtPath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
+    String compactInstant = TestUtils.getLastCompleteInstant(baseMdtPath);
+    assertNotNull(compactInstant);
+    WriteOperationType writeOperationType = isLogCompaction ? 
WriteOperationType.LOG_COMPACT : WriteOperationType.COMPACT;
+    assertEquals(writeOperationType, TestUtils.getOperationType(baseMdtPath, 
compactInstant));
+  }
+
+  private static WriteOperationType getOperationType(String basePath, String 
instantTs) throws IOException {
+    final HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+        new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())), basePath);
+    HoodieInstant hoodieInstant = 
metaClient.getCommitsTimeline().filter(instant -> 
instant.requestedTime().equals(instantTs)).firstInstant().get();
+    return 
metaClient.getCommitTimeline().readCommitMetadata(hoodieInstant).getOperationType();
+  }
+
   public static String getLastDeltaCompleteInstant(String basePath) {
     final HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
         new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())), basePath);


Reply via email to