This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bfc236b0398fc6f4bb732cc23785c9a988ad4dfe Author: vamsikarnika <[email protected]> AuthorDate: Tue Oct 28 10:28:51 2025 +0530 fix: fix downgrade to not delete unintended partitions in MDT (#14162) --------- Co-authored-by: Vamsi <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../java/org/apache/hudi/table/HoodieTable.java | 12 +-- .../hudi/table/upgrade/UpgradeDowngrade.java | 2 +- .../org/apache/hudi/table/HoodieFlinkTable.java | 3 +- .../org/apache/hudi/table/HoodieJavaTable.java | 9 ++- .../hudi/client/StreamingMetadataWriteHandler.java | 2 +- .../org/apache/hudi/table/HoodieSparkTable.java | 7 +- .../hudi/table/upgrade/TestUpgradeDowngrade.java | 91 ++++++++++++++++++++-- 8 files changed, 106 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index e5880bd29a54..cc19970fb60c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1119,7 +1119,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient }); }); - Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant); + Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant, false, false); // first update table config. Metadata writer initializes the inflight metadata // partitions so we need to first remove the metadata before creating the writer // Also the partitions need to be removed after creating the metadata writer since the writer diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 1438b7e59712..4ae51ac75298 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -1085,7 +1085,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @return instance of {@link HoodieTableMetadataWriter} */ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) { - return getMetadataWriter(triggeringInstantTimestamp, false); + return getMetadataWriter(triggeringInstantTimestamp, false, true); } /** @@ -1094,8 +1094,8 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @param triggeringInstantTimestamp - The instant that is triggering this metadata write * @return instance of {@link HoodieTableMetadataWriter} */ - public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, boolean streamingWrites) { - return getMetadataWriter(triggeringInstantTimestamp, EAGER, streamingWrites); + public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, boolean streamingWrites, boolean autoDetectAndDeleteMetadataPartitions) { + return getMetadataWriter(triggeringInstantTimestamp, EAGER, streamingWrites, autoDetectAndDeleteMetadataPartitions); } /** @@ -1105,7 +1105,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @return An instance of {@link HoodieTableMetadataWriter}. */ public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String triggeringInstantTimestamp) { - return getMetadataWriter(triggeringInstantTimestamp, LAZY, false); + return getMetadataWriter(triggeringInstantTimestamp, LAZY, false, false); } /** @@ -1121,12 +1121,14 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @param triggeringInstantTimestamp The instant that is triggering this metadata write * @param failedWritesCleaningPolicy Cleaning policy on failed writes * @param streamingWrites Whether streaming write is enabled + * @param autoDetectAndDeleteMetadataPartitions true when metadata partitions could be deleted based on incoming write config properties. * @return instance of {@link HoodieTableMetadataWriter} */ protected Option<HoodieTableMetadataWriter> getMetadataWriter( String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - boolean streamingWrites) { + boolean streamingWrites, + boolean autoDetectAndDeleteMetadataPartitions) { // Each engine is expected to override this and // provide the actual metadata writer, if enabled. return Option.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index bc643313b99a..581452afa747 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -269,7 +269,7 @@ public class UpgradeDowngrade { HoodieTable table = upgradeDowngradeHelper.getTable(updatedConfig, context); String newInstant = table.getMetaClient().createNewInstantTime(false); - Option<HoodieTableMetadataWriter> mdtWriterOpt = table.getMetadataWriter(newInstant); + Option<HoodieTableMetadataWriter> mdtWriterOpt = table.getMetadataWriter(newInstant, false, false); mdtWriterOpt.ifPresent(mdtWriter -> { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); commitMetadata.setOperationType(WriteOperationType.UPSERT); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 403526d23aba..910a66a2c3f0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -100,7 +100,8 @@ public abstract class HoodieFlinkTable<T> protected Option<HoodieTableMetadataWriter> getMetadataWriter( String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - boolean streamingWrites) { + boolean streamingWrites, + boolean autoDetectAndDeleteMetadataPartitions) { if (isMetadataTable()) { return Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 77200c69cf5e..32d6e7d281ec 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -81,7 +81,8 @@ public abstract class HoodieJavaTable<T> @Override protected Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - boolean streamingWrites) { + boolean streamingWrites, + boolean autoDetectAndDeleteMetadataPartitions) { if (isMetadataTable()) { return Option.empty(); } @@ -93,8 +94,10 @@ public abstract class HoodieJavaTable<T> getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(), Option.of(triggeringInstantTimestamp)); // even with metadata enabled, some index could have been disabled - // delete metadata partitions corresponding to such indexes - deleteMetadataIndexIfNecessary(); + // delete metadata partitions corresponding to such indexes if autoDetectAndDeleteMdtPartitions is enabled + if (autoDetectAndDeleteMetadataPartitions) { + deleteMetadataIndexIfNecessary(); + } try { if (isMetadataTableExists || metaClient.getStorage().exists( HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java index 93cf377cb6be..470f10013ccb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java @@ -137,7 +137,7 @@ public class StreamingMetadataWriteHandler { return this.metadataWriterMap.get(triggeringInstant); } - Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(triggeringInstant, true); + Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(triggeringInstant, true, true); metadataWriterMap.put(triggeringInstant, metadataWriterOpt); // populate this for every new instant time. // if metadata table does not exist, the map will contain an entry, with value Option.empty. // if not, it will contain the metadata writer instance. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index a9276fd37902..fa66af392713 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -96,14 +96,17 @@ public abstract class HoodieSparkTable<T> protected Option<HoodieTableMetadataWriter> getMetadataWriter( String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - boolean streamingWrites) { + boolean streamingWrites, + boolean autoDetectAndDeleteMetadataPartitions) { if (isMetadataTable()) { return Option.empty(); } if (config.isMetadataTableEnabled()) { // if any partition is deleted, we need to reload the metadata table writer so that new table configs are picked up // to reflect the delete mdt partitions. - deleteMetadataIndexIfNecessary(); + if (autoDetectAndDeleteMetadataPartitions) { + deleteMetadataIndexIfNecessary(); + } // Create the metadata table writer. First time after the upgrade this creation might trigger // metadata table bootstrapping. Bootstrapping process could fail and checking the table diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 5ae4fc222a1c..032eda704eb7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -18,8 +18,13 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -27,15 +32,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Disabled; @@ -49,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -57,6 +66,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC; import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -98,31 +108,31 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { boolean isUpgrade = fromVersion.lesserThan(toVersion); String operation = isUpgrade ? "upgrade" : "downgrade"; LOG.info("Testing {} from version {} to {}", operation, fromVersion, toVersion); - + HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, suffix); assertEquals(fromVersion, originalMetaClient.getTableConfig().getTableVersion(), "Fixture table should be at expected version"); - + HoodieWriteConfig config = createWriteConfig(originalMetaClient, true); - + int initialPendingCommits = originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants(); int initialCompletedCommits = originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants(); - + Dataset<Row> originalData = readTableData(originalMetaClient, "before " + operation); - + // Confirm that there are log files before rollback and compaction operations if (isRollbackAndCompactTransition(fromVersion, toVersion)) { validateLogFilesCount(originalMetaClient, operation, suffix.equals("-mor")); } - + new UpgradeDowngrade(originalMetaClient, config, context(), SparkUpgradeDowngradeHelper.getInstance()) .run(toVersion, null); - + HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder() .setConf(storageConf().newInstance()) .setBasePath(originalMetaClient.getBasePath()) .build(); - + assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion); validateVersionSpecificProperties(resultMetaClient, toVersion); validateDataConsistency(originalData, resultMetaClient, "after " + operation); @@ -376,6 +386,62 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { } } + @ParameterizedTest + @MethodSource("testMdtValidationDowngrade") + public void testMdtPartitionNotDroppedWhenDowngradedFromTableVersionNine(HoodieTableType tableType, boolean mdtEnabled) throws Exception { + HoodieTableVersion fromVersion = HoodieTableVersion.NINE; + HoodieTableVersion toVersion = HoodieTableVersion.EIGHT; + + Properties props = new Properties(); + props.put(HoodieTableConfig.TYPE.key(), tableType.name()); + HoodieTableMetaClient metaClient = + getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), props); + + HoodieWriteConfig writeConfig = getConfigBuilder(true) + .withPath(metaClient.getBasePath()) + .withWriteTableVersion(fromVersion.versionCode()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withEnableRecordIndex(true).build()) + .withProps(props) + .build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); + String partitionPath = "2021/09/11"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath}); + + String instant1 = getCommitTimeAtUTC(1); + List<HoodieRecord> records = dataGenerator.generateInserts(instant1, 100); + JavaRDD<HoodieRecord> dataset = jsc().parallelize(records, 2); + + WriteClientTestUtils.startCommitWithTime(writeClient, instant1); + writeClient.commit(instant1, writeClient.insert(dataset, instant1)); + metaClient.reloadTableConfig(); + + // verify record index partition exists before downgrade + assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())); + + HoodieWriteConfig.Builder upgradeWriteConfig = HoodieWriteConfig.newBuilder() + .withPath(metaClient.getBasePath()) + .withProps(props); + if (mdtEnabled) { + upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(false).build()); + } else { + upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); + } + + new UpgradeDowngrade(metaClient, upgradeWriteConfig.build(), context(), SparkUpgradeDowngradeHelper.getInstance()) + .run(toVersion, null); + + HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder() + .setConf(storageConf().newInstance()) + .setBasePath(metaClient.getBasePath()) + .build(); + + resultMetaClient.reloadTableConfig(); + // verify record index partition exists after downgrade + assertTrue(resultMetaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath())); + } + /** * Load a fixture table from resources and copy it to a temporary location for testing. */ @@ -536,6 +602,15 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { ); } + private static Stream<Arguments> testMdtValidationDowngrade() { + return Stream.of( + Arguments.of(HoodieTableType.COPY_ON_WRITE, true), + Arguments.of(HoodieTableType.COPY_ON_WRITE, false), + Arguments.of(HoodieTableType.MERGE_ON_READ, true), + Arguments.of(HoodieTableType.MERGE_ON_READ, false) + ); + } + private static Stream<Arguments> testArgsPayloadUpgradeDowngrade() { String[] payloadTypes = { "default", "overwrite", "partial", "postgres", "mysql",
