This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8e2a32e994a4a96211384195202ded38e71890b5
Author: Danny Chan <[email protected]>
AuthorDate: Thu Oct 16 19:05:52 2025 +0800

    fix: flink mdt compaction should finish pending compactions first (#14095)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  2 +-
 .../hudi/client/HoodieFlinkTableServiceClient.java |  8 ---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  6 ---
 .../FlinkHoodieBackedTableMetadataWriter.java      | 58 +---------------------
 .../sink/TestStreamWriteOperatorCoordinator.java   | 12 ++++-
 5 files changed, 14 insertions(+), 72 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index b7eb5f437bdf..3fc5fbd9c179 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -734,7 +734,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
   protected HoodieTable createTableAndValidate(HoodieWriteConfig config,
                                                BiFunction<HoodieWriteConfig,
-                                                   HoodieEngineContext, 
HoodieTable> createTableFn,
+                                               HoodieEngineContext, 
HoodieTable> createTableFn,
                                                boolean skipValidation) {
     HoodieTable table = createTableFn.apply(config, context);
     if (!skipValidation) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 73c9025dec53..a0580175ea2f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -65,14 +65,6 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
     super(context, clientConfig, timelineService);
   }
 
-  @Override
-  protected HoodieWriteMetadata<List<WriteStatus>> compact(String 
compactionInstantTime, boolean shouldComplete) {
-    // only used for metadata table, the compaction happens in single thread
-    HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = 
createTable(config, storageConf).compact(context, compactionInstantTime);
-    commitCompaction(compactionInstantTime, compactionMetadata, 
Option.empty());
-    return compactionMetadata;
-  }
-
   @Override
   protected TableWriteStats 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<List<WriteStatus>> 
writeMetadata) {
     return new 
TableWriteStats(writeMetadata.getWriteStatuses().stream().map(writeStatus -> 
writeStatus.getStat()).collect(Collectors.toList()));
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index ac3fb32dfab0..d380b934de27 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -362,12 +362,6 @@ public class HoodieFlinkWriteClient<T>
     autoArchiveOnCommit(table);
   }
 
-  @Override
-  public HoodieWriteMetadata<List<WriteStatus>> compact(String 
compactionInstantTime, boolean shouldComplete) {
-    // only used for metadata table, the compaction happens in single thread
-    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
-  }
-
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> cluster(final String 
clusteringInstant, final boolean shouldComplete) {
     throw new HoodieNotSupportedException("Clustering is not supported yet");
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index f5dfb0dae38c..aa2c04db44e4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -31,12 +31,9 @@ import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.BulkInsertPartitioner;
@@ -126,60 +123,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   @Override
   protected void commitInternal(String instantTime, Map<String, 
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
                                 Option<BulkInsertPartitioner> 
bulkInsertPartitioner) {
-    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
-    HoodieData<HoodieRecord> preppedRecords = 
tagRecordsWithLocation(partitionRecordsMap, isInitializing).getKey();
-    List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
-
-    //  Flink engine does not optimize initialCommit to MDT as bulk insert is 
not yet supported
-
-    BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> 
writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?, 
List<WriteStatus>>) getWriteClient();
-    // rollback partially failed writes if any.
-    if (writeClient.rollbackFailedWrites(metadataMetaClient)) {
-      metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
-    }
-
-    compactIfNecessary(writeClient, Option.empty());
-
-    if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
-      // if this is a new commit being applied to metadata for the first time
-      LOG.info("New commit at " + instantTime + " being applied to MDT.");
-    } else {
-      // this code path refers to a re-attempted commit that:
-      //   1. got committed to metadata table, but failed in datatable.
-      //   2. failed while committing to metadata table
-      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
-      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
-      // are upserts to metadata table and so only a new delta commit will be 
created.
-      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
-      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
-      Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.requestedTime().equals(instantTime))
-          .lastInstant();
-      LOG.info(String.format("%s completed commit at %s being applied to MDT.",
-          alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
-
-      // Rollback the previous commit
-      if (!writeClient.rollback(instantTime)) {
-        throw new HoodieMetadataException("Failed to rollback deltacommit at " 
+ instantTime + " from MDT");
-      }
-      metadataMetaClient.reloadActiveTimeline();
-    }
-
-    writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
-    preWrite(instantTime);
-
-    List<WriteStatus> statuses = isInitializing
-        ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, 
bulkInsertPartitioner)
-        : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
-    // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
-    writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
-
-    // reload timeline
+    performTableServices(Option.ofNullable(instantTime), false);
     metadataMetaClient.reloadActiveTimeline();
-    cleanIfNecessary(writeClient, "");
-    writeClient.archive();
-
-    // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, 
dataMetaClient.getTableConfig().getMetadataPartitions()));
+    super.commitInternal(instantTime, partitionRecordsMap, isInitializing, 
bulkInsertPartitioner);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index e521f828ed15..8a79c0ddbf11 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -362,7 +362,17 @@ public class TestStreamWriteOperatorCoordinator {
     metadataTableMetaClient.reloadActiveTimeline();
     completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     assertThat("One instant need to sync to metadata table", 
completedTimeline.countInstants(), is(14));
-    assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), 
is(HoodieTimeline.COMMIT_ACTION));
+    HoodieInstant compactionInstant = 
completedTimeline.nthFromLastInstant(1).get();
+    assertThat(compactionInstant.getAction(), 
is(HoodieTimeline.COMMIT_ACTION));
+
+    // remove the last compaction completed file and write another commit
+    TestUtils.deleteInstantFile(metadataTableMetaClient, compactionInstant);
+    
assertTrue(metadataTableMetaClient.reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstant.requestedTime()));
+    mockWriteWithMetadata(ckp++);
+    metadataTableMetaClient.reloadActiveTimeline();
+    completedTimeline = 
metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+    assertThat("The pending compaction should be recommitted",
+        completedTimeline.nthFromLastInstant(3).get(), is(compactionInstant));
   }
 
   @Test

Reply via email to