This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bfc236b0398fc6f4bb732cc23785c9a988ad4dfe
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Oct 28 10:28:51 2025 +0530

    fix: fix downgrade to not delete unintended partitions in MDT (#14162)
    
    
    ---------
    
    Co-authored-by: Vamsi <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    | 12 +--
 .../hudi/table/upgrade/UpgradeDowngrade.java       |  2 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  3 +-
 .../org/apache/hudi/table/HoodieJavaTable.java     |  9 ++-
 .../hudi/client/StreamingMetadataWriteHandler.java |  2 +-
 .../org/apache/hudi/table/HoodieSparkTable.java    |  7 +-
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   | 91 ++++++++++++++++++++--
 8 files changed, 106 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index e5880bd29a54..cc19970fb60c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1119,7 +1119,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         });
       });
 
-      Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(dropInstant);
+      Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(dropInstant, false, false);
       // first update table config. Metadata writer initializes the inflight 
metadata
       // partitions so we need to first remove the metadata before creating 
the writer
       // Also the partitions need to be removed after creating the metadata 
writer since the writer
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 1438b7e59712..4ae51ac75298 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1085,7 +1085,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   public final Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstantTimestamp) {
-    return getMetadataWriter(triggeringInstantTimestamp, false);
+    return getMetadataWriter(triggeringInstantTimestamp, false, true);
   }
 
   /**
@@ -1094,8 +1094,8 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param triggeringInstantTimestamp - The instant that is triggering this 
metadata write
    * @return instance of {@link HoodieTableMetadataWriter}
    */
-  public final Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstantTimestamp, boolean streamingWrites) {
-    return getMetadataWriter(triggeringInstantTimestamp, EAGER, 
streamingWrites);
+  public final Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstantTimestamp, boolean streamingWrites, boolean 
autoDetectAndDeleteMetadataPartitions) {
+    return getMetadataWriter(triggeringInstantTimestamp, EAGER, 
streamingWrites, autoDetectAndDeleteMetadataPartitions);
   }
 
   /**
@@ -1105,7 +1105,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @return An instance of {@link HoodieTableMetadataWriter}.
    */
   public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String 
triggeringInstantTimestamp) {
-    return getMetadataWriter(triggeringInstantTimestamp, LAZY, false);
+    return getMetadataWriter(triggeringInstantTimestamp, LAZY, false, false);
   }
 
   /**
@@ -1121,12 +1121,14 @@ public abstract class HoodieTable<T, I, K, O> 
implements Serializable {
    * @param triggeringInstantTimestamp The instant that is triggering this 
metadata write
    * @param failedWritesCleaningPolicy Cleaning policy on failed writes
    * @param streamingWrites            Whether streaming write is enabled
+   * @param autoDetectAndDeleteMetadataPartitions true when metadata 
partitions could be deleted based on incoming write config properties.
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   protected Option<HoodieTableMetadataWriter> getMetadataWriter(
       String triggeringInstantTimestamp,
       HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
-      boolean streamingWrites) {
+      boolean streamingWrites,
+      boolean autoDetectAndDeleteMetadataPartitions) {
     // Each engine is expected to override this and
     // provide the actual metadata writer, if enabled.
     return Option.empty();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index bc643313b99a..581452afa747 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -269,7 +269,7 @@ public class UpgradeDowngrade {
 
       HoodieTable table = upgradeDowngradeHelper.getTable(updatedConfig, 
context);
       String newInstant = table.getMetaClient().createNewInstantTime(false);
-      Option<HoodieTableMetadataWriter> mdtWriterOpt = 
table.getMetadataWriter(newInstant);
+      Option<HoodieTableMetadataWriter> mdtWriterOpt = 
table.getMetadataWriter(newInstant, false, false);
       mdtWriterOpt.ifPresent(mdtWriter -> {
         HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
         commitMetadata.setOperationType(WriteOperationType.UPSERT);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 403526d23aba..910a66a2c3f0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -100,7 +100,8 @@ public abstract class HoodieFlinkTable<T>
   protected Option<HoodieTableMetadataWriter> getMetadataWriter(
       String triggeringInstantTimestamp,
       HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
-      boolean streamingWrites) {
+      boolean streamingWrites,
+      boolean autoDetectAndDeleteMetadataPartitions) {
     if (isMetadataTable()) {
       return Option.empty();
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 77200c69cf5e..32d6e7d281ec 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -81,7 +81,8 @@ public abstract class HoodieJavaTable<T>
   @Override
   protected Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstantTimestamp,
                                                                 
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
-                                                                boolean 
streamingWrites) {
+                                                                boolean 
streamingWrites,
+                                                                boolean 
autoDetectAndDeleteMetadataPartitions) {
     if (isMetadataTable()) {
       return Option.empty();
     }
@@ -93,8 +94,10 @@ public abstract class HoodieJavaTable<T>
           getContext().getStorageConf(), config, failedWritesCleaningPolicy, 
getContext(),
           Option.of(triggeringInstantTimestamp));
       // even with metadata enabled, some index could have been disabled
-      // delete metadata partitions corresponding to such indexes
-      deleteMetadataIndexIfNecessary();
+      // delete metadata partitions corresponding to such indexes if 
autoDetectAndDeleteMdtPartitions is enabled
+      if (autoDetectAndDeleteMetadataPartitions) {
+        deleteMetadataIndexIfNecessary();
+      }
       try {
         if (isMetadataTableExists || metaClient.getStorage().exists(
             
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
index 93cf377cb6be..470f10013ccb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
@@ -137,7 +137,7 @@ public class StreamingMetadataWriteHandler {
       return this.metadataWriterMap.get(triggeringInstant);
     }
 
-    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(triggeringInstant, true);
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(triggeringInstant, true, true);
     metadataWriterMap.put(triggeringInstant, metadataWriterOpt); // populate 
this for every new instant time.
     // if metadata table does not exist, the map will contain an entry, with 
value Option.empty.
     // if not, it will contain the metadata writer instance.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index a9276fd37902..fa66af392713 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -96,14 +96,17 @@ public abstract class HoodieSparkTable<T>
   protected Option<HoodieTableMetadataWriter> getMetadataWriter(
       String triggeringInstantTimestamp,
       HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
-      boolean streamingWrites) {
+      boolean streamingWrites,
+      boolean autoDetectAndDeleteMetadataPartitions) {
     if (isMetadataTable()) {
       return Option.empty();
     }
     if (config.isMetadataTableEnabled()) {
       // if any partition is deleted, we need to reload the metadata table 
writer so that new table configs are picked up
       // to reflect the delete mdt partitions.
-      deleteMetadataIndexIfNecessary();
+      if (autoDetectAndDeleteMetadataPartitions) {
+        deleteMetadataIndexIfNecessary();
+      }
 
       // Create the metadata table writer. First time after the upgrade this 
creation might trigger
       // metadata table bootstrapping. Bootstrapping process could fail and 
checking the table
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 5ae4fc222a1c..032eda704eb7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -18,8 +18,13 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -27,15 +32,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Disabled;
@@ -49,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -57,6 +66,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
 import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,31 +108,31 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
     boolean isUpgrade = fromVersion.lesserThan(toVersion);
     String operation = isUpgrade ? "upgrade" : "downgrade";
     LOG.info("Testing {} from version {} to {}", operation, fromVersion, 
toVersion);
-    
+
     HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, 
suffix);
     assertEquals(fromVersion, 
originalMetaClient.getTableConfig().getTableVersion(),
         "Fixture table should be at expected version");
-    
+
     HoodieWriteConfig config = createWriteConfig(originalMetaClient, true);
-    
+
     int initialPendingCommits = 
originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
     int initialCompletedCommits = 
originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
-    
+
     Dataset<Row> originalData = readTableData(originalMetaClient, "before " + 
operation);
-    
+
     // Confirm that there are log files before rollback and compaction 
operations
     if (isRollbackAndCompactTransition(fromVersion, toVersion)) {
       validateLogFilesCount(originalMetaClient, operation, 
suffix.equals("-mor"));
     }
-    
+
     new UpgradeDowngrade(originalMetaClient, config, context(), 
SparkUpgradeDowngradeHelper.getInstance())
         .run(toVersion, null);
-    
+
     HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
         .setConf(storageConf().newInstance())
         .setBasePath(originalMetaClient.getBasePath())
         .build();
-    
+
     assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion);
     validateVersionSpecificProperties(resultMetaClient, toVersion);
     validateDataConsistency(originalData, resultMetaClient, "after " + 
operation);
@@ -376,6 +386,62 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testMdtValidationDowngrade")
+  public void 
testMdtPartitionNotDroppedWhenDowngradedFromTableVersionNine(HoodieTableType 
tableType, boolean mdtEnabled) throws Exception {
+    HoodieTableVersion fromVersion = HoodieTableVersion.NINE;
+    HoodieTableVersion toVersion = HoodieTableVersion.EIGHT;
+
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.TYPE.key(), tableType.name());
+    HoodieTableMetaClient metaClient =
+        getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), 
props);
+
+    HoodieWriteConfig writeConfig = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withWriteTableVersion(fromVersion.versionCode())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+                .withEnableRecordIndex(true).build())
+        .withProps(props)
+        .build();
+
+    SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), 
writeConfig);
+    String partitionPath = "2021/09/11";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[]{partitionPath});
+
+    String instant1 = getCommitTimeAtUTC(1);
+    List<HoodieRecord> records = dataGenerator.generateInserts(instant1, 100);
+    JavaRDD<HoodieRecord> dataset = jsc().parallelize(records, 2);
+
+    WriteClientTestUtils.startCommitWithTime(writeClient, instant1);
+    writeClient.commit(instant1, writeClient.insert(dataset, instant1));
+    metaClient.reloadTableConfig();
+
+    // verify record index partition exists before downgrade
+    
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
+
+    HoodieWriteConfig.Builder upgradeWriteConfig = 
HoodieWriteConfig.newBuilder()
+        .withPath(metaClient.getBasePath())
+        .withProps(props);
+    if (mdtEnabled) {
+      
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(false).build());
+    } else {
+      
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
+    }
+
+    new UpgradeDowngrade(metaClient, upgradeWriteConfig.build(), context(), 
SparkUpgradeDowngradeHelper.getInstance())
+        .run(toVersion, null);
+
+    HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
+        .setConf(storageConf().newInstance())
+        .setBasePath(metaClient.getBasePath())
+        .build();
+
+    resultMetaClient.reloadTableConfig();
+    // verify record index partition exists after downgrade
+    
assertTrue(resultMetaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
+  }
+
   /**
    * Load a fixture table from resources and copy it to a temporary location 
for testing.
    */
@@ -536,6 +602,15 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
     );
   }
 
+  private static Stream<Arguments> testMdtValidationDowngrade() {
+    return Stream.of(
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
+        Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, true),
+        Arguments.of(HoodieTableType.MERGE_ON_READ, false)
+    );
+  }
+
   private static Stream<Arguments> testArgsPayloadUpgradeDowngrade() {
     String[] payloadTypes = {
         "default", "overwrite", "partial", "postgres", "mysql",

Reply via email to