This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8e2a32e994a4a96211384195202ded38e71890b5 Author: Danny Chan <[email protected]> AuthorDate: Thu Oct 16 19:05:52 2025 +0800 fix: flink mdt compaction should finish pending compactions first (#14095) --- .../hudi/client/BaseHoodieTableServiceClient.java | 2 +- .../hudi/client/HoodieFlinkTableServiceClient.java | 8 --- .../apache/hudi/client/HoodieFlinkWriteClient.java | 6 --- .../FlinkHoodieBackedTableMetadataWriter.java | 58 +--------------------- .../sink/TestStreamWriteOperatorCoordinator.java | 12 ++++- 5 files changed, 14 insertions(+), 72 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index b7eb5f437bdf..3fc5fbd9c179 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -734,7 +734,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl protected HoodieTable createTableAndValidate(HoodieWriteConfig config, BiFunction<HoodieWriteConfig, - HoodieEngineContext, HoodieTable> createTableFn, + HoodieEngineContext, HoodieTable> createTableFn, boolean skipValidation) { HoodieTable table = createTableFn.apply(config, context); if (!skipValidation) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 73c9025dec53..a0580175ea2f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -65,14 +65,6 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie super(context, clientConfig, timelineService); } - @Override - protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) { - // only used for metadata table, the compaction happens in single thread - HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = createTable(config, storageConf).compact(context, compactionInstantTime); - commitCompaction(compactionInstantTime, compactionMetadata, Option.empty()); - return compactionMetadata; - } - @Override protected TableWriteStats triggerWritesAndFetchWriteStats(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) { return new TableWriteStats(writeMetadata.getWriteStatuses().stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList())); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ac3fb32dfab0..d380b934de27 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -362,12 +362,6 @@ public class HoodieFlinkWriteClient<T> autoArchiveOnCommit(table); } - @Override - public HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) { - // only used for metadata table, the compaction happens in single thread - return tableServiceClient.compact(compactionInstantTime, shouldComplete); - } - @Override public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringInstant, final boolean shouldComplete) { throw new HoodieNotSupportedException("Clustering is not supported yet"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index f5dfb0dae38c..aa2c04db44e4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -31,12 +31,9 @@ import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.table.BulkInsertPartitioner; @@ -126,60 +123,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad @Override protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing, Option<BulkInsertPartitioner> bulkInsertPartitioner) { - ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet."); - HoodieData<HoodieRecord> preppedRecords = tagRecordsWithLocation(partitionRecordsMap, isInitializing).getKey(); - List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList(); - - // Flink engine does not optimize initialCommit to MDT as bulk insert is not yet supported - - BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>>) getWriteClient(); - // rollback partially failed writes if any. - if (writeClient.rollbackFailedWrites(metadataMetaClient)) { - metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); - } - - compactIfNecessary(writeClient, Option.empty()); - - if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) { - // if this is a new commit being applied to metadata for the first time - LOG.info("New commit at " + instantTime + " being applied to MDT."); - } else { - // this code path refers to a re-attempted commit that: - // 1. got committed to metadata table, but failed in datatable. - // 2. failed while committing to metadata table - // for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. - // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes - // are upserts to metadata table and so only a new delta commit will be created. - // once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is - // already part of completed commit. So, we have to manually rollback the completed instant and proceed. - Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.requestedTime().equals(instantTime)) - .lastInstant(); - LOG.info(String.format("%s completed commit at %s being applied to MDT.", - alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime)); - - // Rollback the previous commit - if (!writeClient.rollback(instantTime)) { - throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT"); - } - metadataMetaClient.reloadActiveTimeline(); - } - - writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, HoodieActiveTimeline.DELTA_COMMIT_ACTION); - preWrite(instantTime); - - List<WriteStatus> statuses = isInitializing - ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, bulkInsertPartitioner) - : writeClient.upsertPreppedRecords(preppedRecordList, instantTime); - // flink does not support auto-commit yet, also the auto commit logic is not complete as BaseHoodieWriteClient now. - writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); - - // reload timeline + performTableServices(Option.ofNullable(instantTime), false); metadataMetaClient.reloadActiveTimeline(); - cleanIfNecessary(writeClient, ""); - writeClient.archive(); - - // Update total size of the metadata and count of base/log files - metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, dataMetaClient.getTableConfig().getMetadataPartitions())); + super.commitInternal(instantTime, partitionRecordsMap, isInitializing, bulkInsertPartitioner); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index e521f828ed15..8a79c0ddbf11 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -362,7 +362,17 @@ public class TestStreamWriteOperatorCoordinator { metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(14)); - assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); + HoodieInstant compactionInstant = completedTimeline.nthFromLastInstant(1).get(); + assertThat(compactionInstant.getAction(), is(HoodieTimeline.COMMIT_ACTION)); + + // remove the last compaction completed file and write another commit + TestUtils.deleteInstantFile(metadataTableMetaClient, compactionInstant); + assertTrue(metadataTableMetaClient.reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstant.requestedTime())); + mockWriteWithMetadata(ckp++); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + assertThat("The pending compaction should be recommitted", + completedTimeline.nthFromLastInstant(3).get(), is(compactionInstant)); } @Test
