This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 53583f12c30d3df6ab8cd05edd6237542907dc34
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 3 19:31:55 2025 -0800

    fix: Avoid deleting metadata table with MOR during upgrade / downgrade 
(#14191)
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Lokesh Jain <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../hudi/table/upgrade/UpgradeDowngradeUtils.java  | 103 +++++++++-----
 .../table/upgrade/TestUpgradeDowngradeUtils.java   | 155 ++++++++++++++++++++-
 .../functional/TestNewHoodieParquetFileFormat.java |   2 +-
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  16 +++
 .../hudi/functional/TestEightToNineUpgrade.scala   |   2 +-
 .../TestUpgradeOrDowngradeProcedure.scala          |  45 +++---
 6 files changed, 265 insertions(+), 58 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index b7c51f31c5cf..9d08375cfebe 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.model.AWSDmsAvroPayload;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -47,6 +48,7 @@ import org.apache.hudi.common.table.timeline.TimelineFactory;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -72,6 +74,7 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -84,6 +87,8 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
 public class UpgradeDowngradeUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UpgradeDowngradeUtils.class);
+  static final String FALSE = "false";
+  static final String TRUE = "true";
 
   // Map of actions that were renamed in table version 8
   static final Map<String, String> SIX_TO_EIGHT_TIMELINE_ACTION_MAP = 
CollectionUtils.createImmutableMap(
@@ -100,36 +105,6 @@ public class UpgradeDowngradeUtils {
       PartialUpdateAvroPayload.class.getName(),
       PostgresDebeziumAvroPayload.class.getName()));
 
-  /**
-   * Utility method to run compaction for MOR table as part of downgrade step.
-   *
-   * @Deprecated Use {@link 
UpgradeDowngradeUtils#rollbackFailedWritesAndCompact(HoodieTable, 
HoodieEngineContext, HoodieWriteConfig, SupportsUpgradeDowngrade, boolean, 
HoodieTableVersion)} instead.
-   */
-  public static void runCompaction(HoodieTable table, HoodieEngineContext 
context, HoodieWriteConfig config,
-                                   SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
-    try {
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-        // set required configs for scheduling compaction.
-        
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
-        HoodieWriteConfig compactionConfig = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
-        compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), 
"true");
-        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
 "1");
-        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
 CompactionTriggerStrategy.NUM_COMMITS.name());
-        
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), 
UnBoundedCompactionStrategy.class.getName());
-        compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
-        try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
-          Option<String> compactionInstantOpt = 
writeClient.scheduleCompaction(Option.empty());
-          if (compactionInstantOpt.isPresent()) {
-            HoodieWriteMetadata result = 
writeClient.compact(compactionInstantOpt.get());
-            writeClient.commitCompaction(compactionInstantOpt.get(), result, 
Option.of(table));
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new HoodieException(e);
-    }
-  }
-
   /**
    * See HUDI-6040.
    */
@@ -214,6 +189,13 @@ public class UpgradeDowngradeUtils {
       if (table.isMetadataTable() && 
tableVersion.equals(HoodieTableVersion.NINE)) {
         properties.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.SINGLE_WRITER.name());
       }
+
+      // Set properties based on existing and inflight metadata partitions.
+      Set<String> metadataPartitions = 
table.getMetaClient().getTableConfig().getMetadataPartitions();
+      
metadataPartitions.addAll(table.getMetaClient().getTableConfig().getMetadataPartitionsInflight());
+      setPropertiesBasedOnMetadataPartitions(properties, metadataPartitions, 
table);
+
+      // Construct rollback config.
       HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
           .withProps(properties)
           .withWriteTableVersion(tableVersion.versionCode())
@@ -231,8 +213,8 @@ public class UpgradeDowngradeUtils {
       } else {
         
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), 
"false");
       }
-      rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
 
+      // Do the rollback and compact.
       try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
         writeClient.rollbackFailedWrites(table.getMetaClient());
         if (shouldCompact) {
@@ -248,12 +230,61 @@ public class UpgradeDowngradeUtils {
     }
   }
 
+  @VisibleForTesting
+  public static void setPropertiesBasedOnMetadataPartitions(TypedProperties 
properties,
+                                                     Set<String> 
metadataPartitions,
+                                                     HoodieTable table) {
+    if (metadataPartitions.isEmpty()) {
+      properties.put(HoodieMetadataConfig.ENABLE.key(), FALSE);
+      return;
+    }
+    // Read index definitions if any.
+    Option<HoodieIndexMetadata> indexMetadataOpt = 
table.getMetaClient().getIndexMetadata();
+    Map<String, HoodieIndexDefinition> indexDefinitions = 
indexMetadataOpt.isEmpty()
+        ? Collections.emptyMap()
+        : indexMetadataOpt.get().getIndexDefinitions();
+    // Enable metadata table.
+    properties.put(HoodieMetadataConfig.ENABLE.key(), TRUE);
+    // Enable individual index.
+    for (String partition : metadataPartitions) {
+      switch (partition) {
+        case HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS:
+        case HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS:
+          
properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
TRUE);
+          if 
(indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
 {
+            List<String> sourceFields = 
indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS).getSourceFields();
+            if (!sourceFields.isEmpty()) {
+              
properties.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(), 
String.join(",", sourceFields));
+            }
+          }
+          break;
+        case HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS:
+          
properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), 
TRUE);
+          break;
+        case HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX:
+          if 
(indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX))
 {
+            Map<String, String> options = 
indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).getIndexOptions();
+            if (options.getOrDefault("isPartitioned", FALSE).equals(TRUE)) {
+              
properties.put(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key(), 
TRUE);
+            } else {
+              
properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE);
+            }
+          } else {
+            
properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE);
+          }
+          break;
+        default:
+          // No op.
+      }
+    }
+  }
+
   // If the metadata table is enabled for the data table, and
   // existing metadata table is behind the data table, then delete it.
   static void checkAndHandleMetadataTable(HoodieEngineContext context,
-                                                 HoodieTable table,
-                                                 HoodieWriteConfig config,
-                                                 HoodieTableMetaClient 
metaClient, boolean checkforMetadataLagging) {
+                                          HoodieTable table,
+                                          HoodieWriteConfig config,
+                                          HoodieTableMetaClient metaClient, 
boolean checkforMetadataLagging) {
     if (!table.isMetadataTable()
         && config.isMetadataTableEnabled()
         && (!checkforMetadataLagging || isMetadataTableBehindDataTable(config, 
metaClient))) {
@@ -262,7 +293,7 @@ public class UpgradeDowngradeUtils {
   }
 
   static boolean isMetadataTableBehindDataTable(HoodieWriteConfig config,
-                                                       HoodieTableMetaClient 
metaClient) {
+                                                HoodieTableMetaClient 
metaClient) {
     // if metadata table does not exist, then it is not behind
     if (!metaClient.getTableConfig().isMetadataTableAvailable()) {
       return false;
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
index a90a57542b3f..1f1ec23b804e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
@@ -18,19 +18,51 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.FALSE;
+import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.TRUE;
 import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.convertCompletionTimeToEpoch;
+import static 
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.setPropertiesBasedOnMetadataPartitions;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class TestUpgradeDowngradeUtils {
+@ExtendWith(MockitoExtension.class)
+class TestUpgradeDowngradeUtils {
+
+  private HoodieTable createMockTable(Option<HoodieIndexMetadata> 
indexMetadataOpt) {
+    HoodieTable table = mock(HoodieTable.class);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(metaClient.getIndexMetadata()).thenReturn(indexMetadataOpt);
+    return table;
+  }
 
   @Test
   void testConvertCompletionTimeToEpoch() {
@@ -53,4 +85,125 @@ public class TestUpgradeDowngradeUtils {
 
     assertEquals(-1, convertCompletionTimeToEpoch(inValidInstant), "Epoch time 
for invalid input should be -1.");
   }
+
+  @Test
+  void testSetPropertiesBasedOnMetadataPartitionsWithEmptySet() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> emptyPartitions = new HashSet<>();
+
+    setPropertiesBasedOnMetadataPartitions(properties, emptyPartitions, null);
+    assertEquals(FALSE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+  }
+
+  @Test
+  void testSetPropertiesBasedOnMetadataPartitionsWithBloomFilters() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS);
+    HoodieTable table = createMockTable(Option.empty());
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key()));
+  }
+
+  @Test
+  void testSetPropertiesBasedOnMetadataPartitionsWithAnyOtherIndexes() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+    partitions.add("any_other_index");
+    HoodieTable table = createMockTable(Option.empty());
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+  }
+
+  @Test
+  void testSetPropertiesBasedOnMetadataPartitionsWithMultiplePartitions() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS);
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+    HoodieTable table = createMockTable(Option.empty());
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key()));
+  }
+
+  @Test
+  void 
testSetPropertiesBasedOnMetadataPartitionsWithColumnStatsAndSourceFields() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+
+    Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+    List<String> sourceFields = Arrays.asList("field1", "field2", "field3");
+    HoodieIndexDefinition columnStatsDef = HoodieIndexDefinition.newBuilder()
+        .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+        .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+        .withSourceFields(sourceFields)
+        .build();
+    indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, 
columnStatsDef);
+    HoodieIndexMetadata indexMetadata = new 
HoodieIndexMetadata(indexDefinitions);
+    HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
+    assertEquals("field1,field2,field3", 
properties.getString(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key()));
+  }
+
+  @Test
+  void testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexPartitioned() {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+
+    Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+    Map<String, String> indexOptions = new HashMap<>();
+    indexOptions.put("isPartitioned", TRUE);
+    HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder()
+        .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+        .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+        .withIndexOptions(indexOptions)
+        .build();
+    indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, 
recordIndexDef);
+    HoodieIndexMetadata indexMetadata = new 
HoodieIndexMetadata(indexDefinitions);
+    HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key()));
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void 
testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexNonPartitioned(boolean 
explicitlySetPartitionedInIndexDef) {
+    TypedProperties properties = new TypedProperties();
+    Set<String> partitions = new HashSet<>();
+    partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+
+    Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+    Map<String, String> indexOptions = new HashMap<>();
+    if (explicitlySetPartitionedInIndexDef) {
+      indexOptions.put("isPartitioned", FALSE);
+    }
+    HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder()
+        .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+        .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+        .withIndexOptions(indexOptions)
+        .build();
+    indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, 
recordIndexDef);
+    HoodieIndexMetadata indexMetadata = new 
HoodieIndexMetadata(indexDefinitions);
+    HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+    setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+    assertEquals(TRUE, 
properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key()));
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index 0053c3f4a977..f1297643c72f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -53,7 +53,7 @@ public class TestNewHoodieParquetFileFormat extends 
TestBootstrapReadBase {
 
   @ParameterizedTest
   @MethodSource("testArgs")
-  public void testNewParquetFileFormat(HoodieTableType tableType, Integer 
nPartitions) {
+  void testNewParquetFileFormat(HoodieTableType tableType, Integer 
nPartitions) {
     this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
     this.dashPartitions = true;
     this.tableType = tableType;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 032eda704eb7..6ce364a1557a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -41,6 +41,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -101,6 +103,20 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  public void testUpgradeDowngradeUtilsCompaction() throws Exception {
+    HoodieTableVersion originalVersion = HoodieTableVersion.EIGHT;
+    HoodieTableMetaClient originalMetaClient = 
loadFixtureTable(HoodieTableVersion.EIGHT, "-mor");
+    HoodieWriteConfig config = createWriteConfig(originalMetaClient, false);
+
+    int numCompactionInstants = 
originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants();
+    HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
+    UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(table, context(), 
config, SparkUpgradeDowngradeHelper.getInstance(), true, originalVersion);
+    originalMetaClient = HoodieTableMetaClient.reload(originalMetaClient);
+    assertEquals(numCompactionInstants + 1, 
originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants());
+    assertTrue(originalMetaClient.getTableConfig().isMetadataTableAvailable());
+  }
+
   @Disabled
   @ParameterizedTest
   @MethodSource("upgradeDowngradeVersionPairs")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
index 372adbc713a3..d100f35f70b4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
@@ -129,7 +129,7 @@ class TestEightToNineUpgrade extends 
RecordLevelIndexTestBase {
     val payloadClass = classOf[MySqlDebeziumAvroPayload].getName
     var opts: Map[String, String] = Map(
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClass,
-      HoodieMetadataConfig.ENABLE.key() -> "false"
+      HoodieMetadataConfig.ENABLE.key() -> "true"
     )
     val columns = Seq("ts", "key", "rider", "driver", 
DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME,
       DebeziumConstants.ADDED_SEQ_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index c588b3f4afbb..05ff2514911b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -21,10 +21,12 @@ import org.apache.hudi.common.config.HoodieConfig
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
 import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils}
+import org.apache.hudi.metadata.MetadataPartitionType
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.NAME_FORMAT_0_X
+import org.junit.jupiter.api.Assertions.assertTrue
 
 import java.io.IOException
 import java.time.Instant
@@ -148,7 +150,7 @@ class TestUpgradeOrDowngradeProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
-  test("Test downgrade table from version eight to version seven") {
+  test("Test downgrade table to version six") {
     withTempDir { tmp =>
       val tableName = generateTableName
       val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -169,32 +171,37 @@ class TestUpgradeOrDowngradeProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
 
+      spark.sql("set hoodie.merge.small.file.group.candidates.limit=0")
       spark.sql("set hoodie.compact.inline=true")
-      spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+      spark.sql("set hoodie.compact.inline.max.delta.commits=4")
       spark.sql("set hoodie.clean.commits.retained = 2")
       spark.sql("set hoodie.keep.min.commits = 3")
       spark.sql("set hoodie.keep.min.commits = 4")
+      spark.sql("set hoodie.metadata.record.index.enable = true")
+
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"update $tableName set name = 'a2' where id = 1")
+      spark.sql(s"update $tableName set name = 'a3' where id = 1")
 
       var metaClient = createMetaClient(spark, tablePath)
-      // verify hoodie.table.version of the table is EIGHT
-      if 
(metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode()))
 {
-        // downgrade table from version eight to version seven
-        checkAnswer(s"""call downgrade_table(table => '$tableName', to_version 
=> 'SEVEN')""")(Seq(true))
-        metaClient = HoodieTableMetaClient.reload(metaClient)
-        assertResult(HoodieTableVersion.SEVEN.versionCode) {
-          metaClient.getTableConfig.getTableVersion.versionCode()
-        }
-        // Verify whether the naming format of instant files is consistent 
with 0.x
-        
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f => 
NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find())
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+      val numCompactionInstants = 
metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants
+      // Disabling record index should not affect downgrade
+      spark.sql("set hoodie.metadata.record.index.enable = false")
+      // downgrade table to version six
+      checkAnswer(s"""call downgrade_table(table => '$tableName', to_version 
=> 'SIX')""")(Seq(true))
+      metaClient = createMetaClient(spark, tablePath)
+      assertResult(numCompactionInstants + 
1)(metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants)
+      assertResult(HoodieTableVersion.SIX.versionCode) {
+        metaClient.getTableConfig.getTableVersion.versionCode()
       }
+      // Verify whether the naming format of instant files is consistent with 
0.x
+      
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f => 
NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find())
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a3", 10.0, 1000)
+      )
+      // Ensure files and record index partition are available after downgrade
+      assertTrue(metaClient.getTableConfig.isMetadataTableAvailable)
+      
assertTrue(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
     }
   }
 

Reply via email to