This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b22e8d9558ea [HUDI-9684] Add Upgrade/Downgrade Test Fixtures Framework 
(#13669)
b22e8d9558ea is described below

commit b22e8d9558ea62a439eb04960d721263728891be
Author: Rahil C <[email protected]>
AuthorDate: Mon Aug 4 18:39:14 2025 -0700

    [HUDI-9684] Add Upgrade/Downgrade Test Fixtures Framework (#13669)
---
 .gitignore                                         |   3 +
 ...ngrade.java => TestUpgradeDowngradeLegacy.java} |   3 +-
 .../table/read/TestHoodieFileGroupReaderBase.java  |  27 +-
 .../hudi/common/testutils/HoodieTestUtils.java     |  50 ++
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   | 586 +++++++++++++++++++++
 .../resources/upgrade-downgrade-fixtures/README.md | 192 +++++++
 .../generate-fixtures.sh                           | 341 ++++++++++++
 .../mor-tables/hudi-v4-table.zip                   | Bin 0 -> 55017 bytes
 .../mor-tables/hudi-v5-table.zip                   | Bin 0 -> 51947 bytes
 .../mor-tables/hudi-v6-table.zip                   | Bin 0 -> 55824 bytes
 .../mor-tables/hudi-v8-table.zip                   | Bin 0 -> 122258 bytes
 .../mor-tables/hudi-v9-table.zip                   | Bin 0 -> 123797 bytes
 .../scala-templates/generate-fixture.scala         | 106 ++++
 13 files changed, 1281 insertions(+), 27 deletions(-)

diff --git a/.gitignore b/.gitignore
index daf30008a5de..05fb43d452aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,6 @@ dependency-reduced-pom.xml
 hudi-integ-test/compose_env
 node_modules
 package-lock.json
+
+# Spark Binaries used for testing
+hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/spark-versions/
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
similarity index 99%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
rename to 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
index a54d09caee06..245af8e4b3ca 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
@@ -110,8 +110,9 @@ import static org.mockito.Mockito.when;
 
 /**
  * Unit tests {@link UpgradeDowngrade}.
+ * New tests for upgrade and downgrade operations should be added to 
TestUpgradeDowngrade using the test fixtures.
  */
-public class TestUpgradeDowngrade extends HoodieClientTestBase {
+public class TestUpgradeDowngradeLegacy extends HoodieClientTestBase {
 
   private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with 
deletePartialMarkerFiles={0} and TableType = {1}";
   private static final String TEST_NAME_WITH_DOWNGRADE_PARAMS = "[{index}] 
Test with deletePartialMarkerFiles={0} and TableType = {1} and "
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index e681ba8a8691..147b677ee72a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -71,8 +71,6 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
@@ -90,8 +88,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
 
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
@@ -540,7 +536,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   @Test
   public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
     Path zipOutput = Paths.get(new URI(getBasePath()));
-    extract(zipOutput);
+    
HoodieTestUtils.extractZipToDirectory("file-group-reader/bootstrap_data.zip", 
zipOutput, getClass());
     ObjectMapper objectMapper = new ObjectMapper();
     Path basePath = zipOutput.resolve("bootstrap_data");
     List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = new 
ArrayList<>();
@@ -954,25 +950,4 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     return partitionPath;
   }
 
-  private void extract(Path target) throws IOException {
-    try (ZipInputStream zip = new 
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
 {
-      ZipEntry entry;
-
-      while ((entry = zip.getNextEntry()) != null) {
-        File file = target.resolve(entry.getName()).toFile();
-        if (entry.isDirectory()) {
-          file.mkdirs();
-          continue;
-        }
-        byte[] buffer = new byte[10000];
-        file.getParentFile().mkdirs();
-        try (BufferedOutputStream out = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
-          int count;
-          while ((count = zip.read(buffer)) != -1) {
-            out.write(buffer, 0, count);
-          }
-        }
-      }
-    }
-  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 6bd86f60a95f..9ee9f1ba8be6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -63,10 +63,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.junit.jupiter.api.Assumptions;
 
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -444,6 +451,49 @@ public class HoodieTestUtils {
     return deleteFileList;
   }
 
+  /**
+   * Extracts a ZIP file from resources to a target directory.
+   * 
+   * @param resourcePath the path to the ZIP resource (relative to classpath)
+   * @param targetDirectory the target directory to extract files to
+   * @param resourceClass the class to use for resource loading
+   * @throws IOException if extraction fails
+   */
+  public static void extractZipToDirectory(String resourcePath, Path 
targetDirectory, Class<?> resourceClass) throws IOException {
+    InputStream resourceStream = 
resourceClass.getClassLoader().getResourceAsStream(resourcePath);
+    if (resourceStream == null) {
+      // Fallback to getResourceAsStream if 
getClassLoader().getResourceAsStream() fails
+      resourceStream = resourceClass.getResourceAsStream(resourcePath);
+    }
+    
+    if (resourceStream == null) {
+      throw new IOException("Resource not found at: " + resourcePath);
+    }
+
+    try (ZipInputStream zip = new ZipInputStream(resourceStream)) {
+      ZipEntry entry;
+      while ((entry = zip.getNextEntry()) != null) {
+        File file = targetDirectory.resolve(entry.getName()).toFile();
+        if (entry.isDirectory()) {
+          file.mkdirs();
+          continue;
+        }
+        
+        // Create parent directories if they don't exist
+        file.getParentFile().mkdirs();
+        
+        // Extract file content
+        byte[] buffer = new byte[10000];
+        try (BufferedOutputStream out = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
+          int count;
+          while ((count = zip.read(buffer)) != -1) {
+            out.write(buffer, 0, count);
+          }
+        }
+      }
+    }
+  }
+
   public static void validateTableConfig(HoodieStorage storage,
                                          String basePath,
                                          Map<String, String> expectedConfigs,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
new file mode 100644
index 000000000000..467ef97f45fc
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for upgrade/downgrade operations using pre-created fixture tables
+ * from different Hudi releases.
+ */
+public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestUpgradeDowngrade.class);
+  private static final String FIXTURES_BASE_PATH = 
"/upgrade-downgrade-fixtures/mor-tables/";
+  
+  @TempDir
+  java.nio.file.Path tempDir;
+  
+  private HoodieTableMetaClient metaClient;
+
+  @ParameterizedTest
+  @MethodSource("upgradeDowngradeVersionPairs")
+  public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, 
HoodieTableVersion toVersion) throws Exception {
+    boolean isUpgrade = fromVersion.lesserThan(toVersion);
+    String operation = isUpgrade ? "upgrade" : "downgrade";
+    LOG.info("Testing {} from version {} to {}", operation, fromVersion, 
toVersion);
+    
+    HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion);
+    assertEquals(fromVersion, 
originalMetaClient.getTableConfig().getTableVersion(),
+        "Fixture table should be at expected version");
+    
+    HoodieWriteConfig config = createWriteConfig(originalMetaClient, true);
+    
+    int initialPendingCommits = 
originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
+    int initialCompletedCommits = 
originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
+    
+    Dataset<Row> originalData = readTableData(originalMetaClient, "before " + 
operation);
+    
+    new UpgradeDowngrade(originalMetaClient, config, context(), 
SparkUpgradeDowngradeHelper.getInstance())
+        .run(toVersion, null);
+    
+    HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
+        .setConf(storageConf().newInstance())
+        .setBasePath(originalMetaClient.getBasePath())
+        .build();
+    
+    assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion);
+    validateVersionSpecificProperties(resultMetaClient, fromVersion, 
toVersion);
+    validateDataConsistency(originalData, resultMetaClient, "after " + 
operation);
+    
+    // Validate pending commits based on whether this transition performs 
rollback operations
+    int finalPendingCommits = 
resultMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
+    if (isRollbackTransition(fromVersion, toVersion)) {
+      // Handlers that call rollbackFailedWritesAndCompact() clear all pending 
commits
+      assertEquals(0, finalPendingCommits,
+          "Pending commits should be cleared to 0 after " + operation);
+    } else {
+      // Other handlers may clean up some pending commits but don't 
necessarily clear all
+      assertTrue(finalPendingCommits <= initialPendingCommits,
+          "Pending commits should be cleaned up or reduced after " + 
operation);
+    }
+    
+    int finalCompletedCommits = 
resultMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
+    assertTrue(finalCompletedCommits >= initialCompletedCommits,
+        "Completed commits should be preserved or increased after " + 
operation);
+
+    LOG.info("Successfully completed {} test for version {} -> {}", operation, 
fromVersion, toVersion);
+  }
+
+  @ParameterizedTest
+  @MethodSource("tableVersions")
+  public void testAutoUpgradeDisabled(HoodieTableVersion originalVersion) 
throws Exception {
+    LOG.info("Testing auto-upgrade disabled for version {}", originalVersion);
+    
+    HoodieTableMetaClient originalMetaClient = 
loadFixtureTable(originalVersion);
+    
+    Option<HoodieTableVersion> targetVersionOpt = 
getNextVersion(originalVersion);
+    if (!targetVersionOpt.isPresent()) {
+      LOG.info("Skipping auto-upgrade test for version {} (no higher version 
available)", originalVersion);
+      return;
+    }
+    HoodieTableVersion targetVersion = targetVersionOpt.get();
+    
+    HoodieWriteConfig config = createWriteConfig(originalMetaClient, false);
+    
+    // Attempt upgrade with auto-upgrade disabled
+    new UpgradeDowngrade(originalMetaClient, config, context(), 
SparkUpgradeDowngradeHelper.getInstance())
+        .run(targetVersion, null);
+    
+    // Create fresh meta client to validate that version remained unchanged 
+    HoodieTableMetaClient unchangedMetaClient = HoodieTableMetaClient.builder()
+        .setConf(storageConf().newInstance())
+        .setBasePath(originalMetaClient.getBasePath())
+        .build();
+    assertEquals(originalVersion, 
unchangedMetaClient.getTableConfig().getTableVersion(),
+        "Table version should remain unchanged when auto-upgrade is disabled");
+    validateVersionSpecificProperties(unchangedMetaClient, originalVersion);
+    performDataValidationOnTable(unchangedMetaClient, "after auto-upgrade 
disabled test");
+    
+    LOG.info("Auto-upgrade disabled test passed for version {}", 
originalVersion);
+  }
+
+  /**
+   * Load a fixture table from resources and copy it to a temporary location 
for testing.
+   */
+  private HoodieTableMetaClient loadFixtureTable(HoodieTableVersion version) 
throws IOException {
+    String fixtureName = getFixtureName(version);
+    String resourcePath = FIXTURES_BASE_PATH + fixtureName;
+    
+    LOG.info("Loading fixture from resource path: {}", resourcePath);
+    HoodieTestUtils.extractZipToDirectory(resourcePath, tempDir, getClass());
+    
+    String tableName = fixtureName.replace(".zip", "");
+    String tablePath = tempDir.resolve(tableName).toString();
+    
+    metaClient = HoodieTableMetaClient.builder()
+        .setConf(storageConf().newInstance())
+        .setBasePath(tablePath)
+        .build();
+    
+    LOG.info("Loaded fixture table {} at version {}", fixtureName, 
metaClient.getTableConfig().getTableVersion());
+    return metaClient;
+  }
+
+  private HoodieWriteConfig createWriteConfig(HoodieTableMetaClient 
metaClient, boolean autoUpgrade) {
+    Properties props = new Properties();
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+        .withPath(metaClient.getBasePath().toString())
+        .withAutoUpgradeVersion(autoUpgrade)
+        .withProps(props);
+
+    // For validation operations, keep timeline server disabled for simplicity
+    if (!autoUpgrade) {
+      builder.withEmbeddedTimelineServerEnabled(false);
+    }
+    
+    return builder.build();
+  }
+
+  private Option<HoodieTableVersion> getNextVersion(HoodieTableVersion 
current) {
+    switch (current) {
+      case FOUR:
+        return Option.of(HoodieTableVersion.FIVE);
+      case FIVE:
+        return Option.of(HoodieTableVersion.SIX);
+      case SIX:
+        // even though there is a table version 7, this is not an official 
release and serves as a bridge
+        // so the next version should be 8
+        return Option.of(HoodieTableVersion.EIGHT);
+      case EIGHT:
+        return Option.of(HoodieTableVersion.NINE);
+      case NINE:
+      default:
+        return Option.empty();
+    }
+  }
+
+  /**
+   * Get fixture zip file name for a given table version.
+   */
+  private String getFixtureName(HoodieTableVersion version) {
+    switch (version) {
+      case FOUR:
+        return "hudi-v4-table.zip";
+      case FIVE:
+        return "hudi-v5-table.zip";
+      case SIX:
+        return "hudi-v6-table.zip";
+      case EIGHT:
+        return "hudi-v8-table.zip";
+      case NINE:
+        return "hudi-v9-table.zip";
+      default:
+        throw new IllegalArgumentException("Unsupported fixture version: " + 
version);
+    }
+  }
+
+  private static Stream<Arguments> tableVersions() {
+    return Stream.of(
+        Arguments.of(HoodieTableVersion.FOUR),   // Hudi 0.11.1
+        Arguments.of(HoodieTableVersion.FIVE),   // Hudi 0.12.2
+        Arguments.of(HoodieTableVersion.SIX),    // Hudi 0.14
+        Arguments.of(HoodieTableVersion.EIGHT),  // Hudi 1.0.2
+        Arguments.of(HoodieTableVersion.NINE)    // Hudi 1.1
+    );
+  }
+
+  private static Stream<Arguments> upgradeDowngradeVersionPairs() {
+    return Stream.of(
+        // Upgrade test cases
+        Arguments.of(HoodieTableVersion.FOUR, HoodieTableVersion.FIVE),   // 
V4 -> V5
+        Arguments.of(HoodieTableVersion.FIVE, HoodieTableVersion.SIX),    // 
V5 -> V6  
+        Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT),   // 
V6 -> V8
+        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE),  // 
V8 -> V9
+        
+        // Downgrade test cases
+        Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT),  // 
V9 -> V8
+        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX),   // 
V8 -> V6
+        Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.FIVE),    // 
V6 -> V5
+        Arguments.of(HoodieTableVersion.FIVE, HoodieTableVersion.FOUR)    // 
V5 -> V4
+    );
+  }
+
+  /**
+   * Assert table version on both data table and metadata table (if exists).
+   */
+  private void assertTableVersionOnDataAndMetadataTable(
+      HoodieTableMetaClient metaClient, HoodieTableVersion expectedVersion) 
throws IOException {
+    assertTableVersion(metaClient, expectedVersion);
+    if (expectedVersion.greaterThanOrEquals(HoodieTableVersion.FOUR)) {
+      StoragePath metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+      if (metaClient.getStorage().exists(metadataTablePath)) {
+        LOG.info("Verifying metadata table version at: {}", metadataTablePath);
+        HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+            
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
+        assertTableVersion(mdtMetaClient, expectedVersion);
+      } else {
+        LOG.info("Metadata table does not exist at: {}", metadataTablePath);
+      }
+    }
+  }
+
+  private void assertTableVersion(
+      HoodieTableMetaClient metaClient, HoodieTableVersion expectedVersion) {
+    assertEquals(expectedVersion,
+        metaClient.getTableConfig().getTableVersion());
+  }
+
+  /**
+   * Validate version-specific properties after upgrade/downgrade operations.
+   */
+  private void validateVersionSpecificProperties(
+      HoodieTableMetaClient metaClient, HoodieTableVersion fromVersion, 
HoodieTableVersion toVersion) throws IOException {
+    LOG.info("Validating version-specific properties: {} -> {}", fromVersion, 
toVersion);
+    
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    
+    // Validate properties for target version
+    switch (toVersion) {
+      case FOUR:
+        validateVersion4Properties(metaClient, tableConfig);
+        break;
+      case FIVE:
+        validateVersion5Properties(metaClient, tableConfig);
+        break;
+      case SIX:
+        validateVersion6Properties(metaClient);
+        break;
+      case EIGHT:
+        validateVersion8Properties(tableConfig);
+        break;
+      case NINE:
+        validateVersion9Properties(metaClient, tableConfig);
+        break;
+      default:
+        LOG.warn("No specific property validation for version {}", toVersion);
+    }
+  }
+
+  /**
+   * Validate version-specific properties for a single table version.
+   */
+  private void validateVersionSpecificProperties(
+      HoodieTableMetaClient metaClient, HoodieTableVersion version) throws 
IOException {
+    LOG.info("Validating version-specific properties for version {}", version);
+    
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    
+    // Validate properties for the version
+    switch (version) {
+      case FOUR:
+        validateVersion4Properties(metaClient, tableConfig);
+        break;
+      case FIVE:
+        validateVersion5Properties(metaClient, tableConfig);
+        break;
+      case SIX:
+        validateVersion6Properties(metaClient);
+        break;
+      case EIGHT:
+        validateVersion8Properties(tableConfig);
+        break;
+      case NINE:
+        validateVersion9Properties(metaClient, tableConfig);
+        break;
+      default:
+        LOG.warn("No specific property validation for version {}", version);
+    }
+  }
+
+  private void validateVersion4Properties(HoodieTableMetaClient metaClient, 
HoodieTableConfig tableConfig) throws IOException {
+    assertTrue(tableConfig.contains(HoodieTableConfig.TABLE_CHECKSUM),
+        "TABLE_CHECKSUM should be set for V4");
+    String actualChecksum = 
tableConfig.getString(HoodieTableConfig.TABLE_CHECKSUM);
+    assertNotNull(actualChecksum, "TABLE_CHECKSUM should not be null");
+    
+    // Validate that the checksum is valid by comparing with computed checksum
+    String expectedChecksum = 
String.valueOf(HoodieTableConfig.generateChecksum(tableConfig.getProps()));
+    assertEquals(expectedChecksum, actualChecksum, 
+        "TABLE_CHECKSUM should match computed checksum");
+
+    assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_1, 
tableConfig.getTimelineLayoutVersion().get());
+    
+    // TABLE_METADATA_PARTITIONS should be properly set if present
+    // Note: This is optional based on whether metadata table was enabled 
during upgrade
+    // After downgrade operations, metadata table may be deleted, so we check 
if it exists first
+    if (tableConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
+      if (isMetadataTablePresent(metaClient)) {
+        // Metadata table exists - enforce strict validation
+        String metadataPartitions = 
tableConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS);
+        assertTrue(metadataPartitions.contains("files"), 
+            "TABLE_METADATA_PARTITIONS should contain 'files' partition when 
metadata table exists");
+      } else {
+        // Metadata table doesn't exist (likely after downgrade) - validation 
not applicable
+        LOG.info("Skipping TABLE_METADATA_PARTITIONS 'files' validation - 
metadata table does not exist (likely after downgrade operation)");
+      }
+    }
+  }
+
+  private boolean isMetadataTablePresent(HoodieTableMetaClient metaClient) 
throws IOException {
+    StoragePath metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+    return metaClient.getStorage().exists(metadataTablePath);
+  }
+  
+  /**
+   * Determine if a version transition performs rollback operations that clear 
all pending commits.
+   * These handlers call rollbackFailedWritesAndCompact() which clears pending 
commits to 0.
+   */
+  private boolean isRollbackTransition(HoodieTableVersion fromVersion, 
HoodieTableVersion toVersion) {
+    // Upgrade handlers that perform rollbacks
+    if (fromVersion == HoodieTableVersion.SEVEN && toVersion == 
HoodieTableVersion.EIGHT) {
+      return true; // SevenToEightUpgradeHandler
+    }
+    if (fromVersion == HoodieTableVersion.EIGHT && toVersion == 
HoodieTableVersion.NINE) {
+      return true; // EightToNineUpgradeHandler
+    }
+    
+    // Downgrade handlers that perform rollbacks
+    if (fromVersion == HoodieTableVersion.SIX && toVersion == 
HoodieTableVersion.FIVE) {
+      return true; // SixToFiveDowngradeHandler
+    }
+    if (fromVersion == HoodieTableVersion.EIGHT && toVersion == 
HoodieTableVersion.SEVEN) {
+      return true; // EightToSevenDowngradeHandler  
+    }
+    if (fromVersion == HoodieTableVersion.NINE && toVersion == 
HoodieTableVersion.EIGHT) {
+      return true; // NineToEightDowngradeHandler
+    }
+    
+    return false; // All other transitions don't perform rollbacks
+  }
+
+  private void validateVersion5Properties(HoodieTableMetaClient metaClient, 
HoodieTableConfig tableConfig) throws IOException {
+
+    validateVersion4Properties(metaClient, tableConfig);
+    // Version 5 upgrade validates that no deprecated default partition paths 
exist
+    // The upgrade handler checks for DEPRECATED_DEFAULT_PARTITION_PATH 
("default") 
+    // and requires migration to DEFAULT_PARTITION_PATH 
("__HIVE_DEFAULT_PARTITION__")
+    
+    // If table is partitioned, validate partition path migration
+    if (tableConfig.isTablePartitioned()) {
+      LOG.info("Validating V5 partition path migration for partitioned table");
+      
+      // Check hive-style partitioning configuration
+      boolean hiveStylePartitioningEnable = 
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+      LOG.info("Hive-style partitioning enabled: {}", 
hiveStylePartitioningEnable);
+      
+      // Validate partition field configuration exists
+      assertTrue(tableConfig.getPartitionFields().isPresent(),
+          "Partition fields should be present for partitioned table in V5");
+    } else {
+      LOG.info("Non-partitioned table - skipping partition path validation for 
V5");
+    }
+  }
+
+  private void validateVersion6Properties(HoodieTableMetaClient metaClient) 
throws IOException {
+    // Version 6 upgrade deletes compaction requested files from .aux folder 
(HUDI-6040)
+    // Validate that no REQUESTED compaction files remain in auxiliary folder
+    validateVersion5Properties(metaClient, metaClient.getTableConfig());
+
+    StoragePath auxPath = new StoragePath(metaClient.getMetaAuxiliaryPath());
+    
+    if (!metaClient.getStorage().exists(auxPath)) {
+      // Auxiliary folder doesn't exist - this is valid, nothing to clean up
+      LOG.info("V6 validation passed: Auxiliary folder does not exist");
+      return;
+    }
+    
+    // Auxiliary folder exists - validate that REQUESTED compaction files were 
cleaned up
+    LOG.info("V6 validation: Checking auxiliary folder cleanup at: {}", 
auxPath);
+    
+    // Get pending compaction timeline with REQUESTED state (same as upgrade 
handler)
+    HoodieTimeline compactionTimeline = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline()
+        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+    
+    InstantFileNameGenerator factory = 
metaClient.getInstantFileNameGenerator();
+    
+    // Validate that none of the REQUESTED compaction files exist in auxiliary 
folder
+    compactionTimeline.getInstantsAsStream().forEach(instant -> {
+      StoragePath compactionFile = new 
StoragePath(metaClient.getMetaAuxiliaryPath(), factory.getFileName(instant));
+      try {
+        if (metaClient.getStorage().exists(compactionFile)) {
+          throw new AssertionError("V6 validation failed: REQUESTED compaction 
file should have been cleaned up but still exists: " + compactionFile);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to check existence of compaction 
file: " + compactionFile, e);
+      }
+    });
+    
+    LOG.info("V6 validation passed: {} REQUESTED compaction instants verified 
to be cleaned up from auxiliary folder", 
+        compactionTimeline.countInstants());
+  }
+
+  private void validateVersion8Properties(HoodieTableConfig tableConfig) {
+    Option<TimelineLayoutVersion> layoutVersion = 
tableConfig.getTimelineLayoutVersion();
+    assertTrue(layoutVersion.isPresent(), "Timeline layout version should be 
present for V8+");
+    assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_2, layoutVersion.get(),
+        "Timeline layout should be V2 for V8+");
+
+    assertTrue(tableConfig.contains(HoodieTableConfig.TIMELINE_PATH),
+        "Timeline path should be set for V8");
+    assertEquals(HoodieTableConfig.TIMELINE_PATH.defaultValue(),
+        tableConfig.getString(HoodieTableConfig.TIMELINE_PATH),
+        "Timeline path should have default value");
+    
+    assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE),
+        "Record merge mode should be set for V8");
+    RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+    assertNotNull(mergeMode, "Merge mode should not be null");
+    
+    
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+        "Record merge strategy ID should be set for V8");
+    
+    assertTrue(tableConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME),
+        "Payload class should be set for V8");
+    
+    assertTrue(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION),
+        "Initial version should be set for V8");
+    
+    if (tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
+      assertTrue(tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE),
+          "Key generator type should be set when key generator class is 
present");
+    }
+  }
+
+  private void validateVersion9Properties(HoodieTableMetaClient metaClient, 
HoodieTableConfig tableConfig) {
+    validateVersion8Properties(tableConfig);
+
+    // Check if index metadata exists and has proper version information
+    Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
+    if (indexMetadata.isPresent()) {
+      indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef) 
-> {
+        assertNotNull(indexDef.getVersion(), 
+            "Index " + indexName + " should have version information in V9");
+      });
+    }
+  }
+
+  /**
+   * Read table data for validation purposes.
+   */
+  private Dataset<Row> readTableData(HoodieTableMetaClient metaClient, String 
stage) {
+    LOG.info("Reading table data {}", stage);
+    
+    try {
+      String basePath = metaClient.getBasePath().toString();
+      Dataset<Row> tableData = sqlContext().read()
+          .format("hudi")
+          .load(basePath);
+
+      assertNotNull(tableData, "Table read should not return null " + stage);
+      
+      // Force execution to ensure data is read immediately (not lazily)
+      List<Row> rows = tableData.collectAsList();
+      long rowCount = rows.size();
+      assertTrue(rowCount >= 0, "Row count should be non-negative " + stage);
+      
+      // Convert collected rows back to Dataset for use in validation
+      Dataset<Row> materializedData = sqlContext().createDataFrame(rows, 
tableData.schema());
+      
+      LOG.info("Successfully read and materialized table data {} ({} rows)", 
stage, rowCount);
+      return materializedData;
+    } catch (Exception e) {
+      LOG.error("Failed to read table data {} from: {} (version: {})", 
+          stage, metaClient.getBasePath(), 
metaClient.getTableConfig().getTableVersion(), e);
+      throw new RuntimeException("Failed to read table data " + stage, e);
+    }
+  }
+  
+  /**
+   * Validate data consistency between original data and data after 
upgrade/downgrade.
+   * This ensures that upgrade/downgrade operations preserve data integrity.
+   */
+  private void validateDataConsistency(Dataset<Row> originalData, 
HoodieTableMetaClient metaClient, String stage) {
+    LOG.info("Validating data consistency {}", stage);
+    
+    try {
+      Dataset<Row> currentData = readTableData(metaClient, stage);
+      
+      // Exclude Hudi metadata columns
+      Set<String> hoodieMetadataColumns = new HashSet<>(Arrays.asList(
+          "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", 
+          "_hoodie_partition_path", "_hoodie_file_name"));
+      
+      Set<String> columnsToValidate = Arrays.stream(originalData.columns())
+          .filter(col -> !hoodieMetadataColumns.contains(col))
+          .collect(Collectors.toSet());
+      
+      if (columnsToValidate.isEmpty()) {
+        LOG.info("Skipping data consistency validation {} (no business columns 
to validate)", stage);
+        return;
+      }
+      
+      LOG.info("Validating data columns: {}", columnsToValidate);
+      boolean dataConsistent = areDataframesEqual(originalData, currentData, 
columnsToValidate);
+      assertTrue(dataConsistent, " data should be consistent between original 
and " + stage + " states");
+      
+      LOG.info("Data consistency validation passed {}", stage);
+    } catch (Exception e) {
+      throw new RuntimeException("Data consistency validation failed " + 
stage, e);
+    }
+  }
+
+  private void performDataValidationOnTable(HoodieTableMetaClient metaClient, 
String stage) {
+    LOG.info("Performing data validation on table {}", stage);
+    
+    try {
+      Dataset<Row> tableData = readTableData(metaClient, stage);
+      LOG.info("Data validation passed {} (table accessible, {} rows)", stage, 
tableData.count());
+    } catch (Exception e) {
+      throw new RuntimeException("Data validation failed " + stage, e);
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
new file mode 100644
index 000000000000..3ccafcd8ff65
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
@@ -0,0 +1,192 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Hudi Upgrade/Downgrade Test Fixtures
+
+This directory contains pre-created MOR Hudi tables from different releases 
used for testing upgrade/downgrade functionality.
+
+## Fixture Tables
+
+| Directory | Hudi Version | Table Version |
+|-----------|--------------|---------------|
+| `hudi-v4-table/` | 0.11.1       | 4 |
+| `hudi-v5-table/` | 0.12.2       | 5 |
+| `hudi-v6-table/` | 0.14.0       | 6 |
+| `hudi-v8-table/` | 1.0.2        | 8 |
+| `hudi-v9-table/` | 1.1.0        | 9 |
+
+## Table Schema
+
+All fixture tables use a consistent simple schema:
+- `id` (string) - Record identifier
+- `name` (string) - Record name  
+- `ts` (long) - Timestamp
+- `partition` (string) - Partition value
+
+## Table Structure
+
+Each fixture table contains:
+- 2-3 base files (parquet)
+- 2-3 log files 
+- Multiple committed instants
+- 1 pending/failed write (for rollback testing)
+- Basic .hoodie metadata structure
+
+## Generating Fixtures
+
+### Prerequisites
+- Java 8+ installed
+- Internet connection (for downloading Spark binaries and Hudi bundles via 
Maven)
+
+### Generation Process
+
+Use the `generate-fixtures.sh` script to create all fixture tables:
+
+```bash
+./generate-fixtures.sh
+```
+
+**Note**: The script will create fixture tables in the `mor-tables/` 
directory. On first run, it downloads and caches Spark binaries in the 
`spark-versions/` directory. Each fixture generation may take several minutes 
as it downloads Spark binaries and Hudi bundles, then creates table data.
+
+### Script Parameters
+
+The `generate-fixtures.sh` script supports the following parameters:
+
+| Parameter | Description | Required | Example |
+|-----------|-------------|----------|---------|
+| `--version <version_list>` | Comma-separated list of table versions to 
generate | No | `--version 4,5,6` |
+| `--hudi-bundle-path <path>` | Path to locally built Hudi bundle JAR 
(required for version 9) | Only for version 9 | `--hudi-bundle-path 
/path/to/bundle.jar` |
+
+#### Supported Versions
+- **4** - Hudi 0.11.1 (Spark 3.2.4, Scala 2.12)
+- **5** - Hudi 0.12.2 (Spark 3.3.1, Scala 2.12)  
+- **6** - Hudi 0.14.0 (Spark 3.4.1, Scala 2.12)
+- **8** - Hudi 1.0.2 (Spark 3.5.4, Scala 2.12)
+- **9** - Hudi 1.1.0 (Spark 3.5.4, Scala 2.12) - **Requires local bundle**
+
+#### Usage Examples
+
+```bash
+# Generate all available versions (4,5,6,8) - version 9 excluded due to local 
bundle requirement
+./generate-fixtures.sh
+
+# Generate specific versions only
+./generate-fixtures.sh --version 4,5
+
+# Generate only version 6
+./generate-fixtures.sh --version 6
+
+# Generate version 9 (requires locally built Hudi bundle)
+./generate-fixtures.sh --version 9 --hudi-bundle-path 
/path/to/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
+
+# Generate multiple versions including version 9
+./generate-fixtures.sh --version 4,6,9 --hudi-bundle-path /path/to/bundle.jar
+```
+
+#### Version 9 Special Requirements
+
+Version 9 requires a locally built Hudi bundle since Hudi 1.1.0 is not yet 
officially released. To build the bundle:
+
+```bash
+# In your Hudi repository
+cd <hudi-repo>
+mvn clean install -DskipTests -Dspark3.5 -Dscala-2.12 -pl 
packaging/hudi-spark-bundle -am
+
+# Then use the generated bundle
+./generate-fixtures.sh --version 9 --hudi-bundle-path 
<hudi-repo>/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
+```
+
+**Note**: If you try to generate version 9 without providing 
`--hudi-bundle-path`, the script will display detailed build instructions and 
exit with an error.
+
+### Spark Binaries and Compatibility Matrix
+
+The script downloads and caches official Apache Spark binaries with Hudi 
bundles resolved via the `--packages` flag:
+
+| Hudi Version | Table Version | Spark Version | Scala Version | Downloaded 
Binary |
+|--------------|---------------|---------------|---------------|-------------------|
+| 0.11.1       | 4             | 3.2.4         | 2.12          | 
spark-3.2.4-bin-hadoop3.2.tgz |
+| 0.12.2       | 5             | 3.3.1         | 2.12          | 
spark-3.3.1-bin-hadoop3.tgz |
+| 0.14.0       | 6             | 3.4.1         | 2.12          | 
spark-3.4.1-bin-hadoop3.tgz |
+| 1.0.2        | 8             | 3.5.4         | 2.12          | 
spark-3.5.4-bin-hadoop3.tgz |
+| 1.1.0        | 9             | 3.5.4         | 2.12          | 
spark-3.5.4-bin-hadoop3.tgz |
+
+### Manual Generation Example
+
+The script uses a template-based approach with separate Scala files and 
variable substitution. Here's how to manually replicate the process:
+
+#### Hudi 0.11.1 (Version 4)
+```bash
+# 1. Download and extract Spark 3.2.4 binary (if not already present)
+mkdir -p spark-versions
+cd spark-versions
+wget 
https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
+tar -xzf spark-3.2.4-bin-hadoop3.2.tgz
+cd ..
+
+# 2. Create custom Scala script from template
+cp scala-templates/generate-fixture.scala /tmp/generate_hudi-v4-table.scala
+
+# 3. Substitute template variables in the copied script
+sed -i.bak \
+    -e 's/${TABLE_NAME}/hudi-v4-table_table/g' \
+    -e 's|${BASE_PATH}|'$(pwd)'/mor-tables/hudi-v4-table|g' \
+    /tmp/generate_hudi-v4-table.scala
+
+# 4. Run spark-shell with the customized Scala script using -i flag
+./spark-versions/spark-3.2.4-bin-hadoop3.2/bin/spark-shell \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+  --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
+  --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+  --conf 'spark.jars.ivy=/tmp/ivy-cache-hudi-v4-table' \
+  --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
+  -i /tmp/generate_hudi-v4-table.scala
+
+# 5. Clean up temporary files
+rm -f /tmp/generate_hudi-v4-table.scala /tmp/generate_hudi-v4-table.scala.bak
+rm -rf /tmp/ivy-cache-hudi-v4-table
+```
+
+**Note**: The Scala code itself is in `scala-templates/generate-fixture.scala` 
and contains template variables like `${TABLE_NAME}` and `${BASE_PATH}` that 
get replaced by the shell script.
+
+#### Other Versions
+For other versions, use the same template-based pattern but with the 
appropriate Spark binary and Hudi bundle version from the compatibility matrix 
above. The key differences are:
+
+- **Hudi 0.12.2 (Version 5)**:
+  - Spark binary: `./spark-versions/spark-3.3.1-bin-hadoop3/bin/spark-shell`
+  - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2`
+  - Table name: `hudi-v5-table_table`, Base path: `mor-tables/hudi-v5-table`
+
+- **Hudi 0.14.0 (Version 6)**:
+  - Spark binary: `./spark-versions/spark-3.4.1-bin-hadoop3/bin/spark-shell`
+  - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0`
+  - Table name: `hudi-v6-table_table`, Base path: `mor-tables/hudi-v6-table`
+
+- **Hudi 1.0.2 (Version 8)**:
+  - Spark binary: `./spark-versions/spark-3.5.4-bin-hadoop3/bin/spark-shell`
+  - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.2`
+  - Table name: `hudi-v8-table_table`, Base path: `mor-tables/hudi-v8-table`
+
+- **Hudi 1.1.0 (Version 9)**: Requires `--jars <local-bundle-path>` instead of 
`--packages` (see version 9 requirements above)
+
+
+## Notes
+
+- Fixtures are copied to temporary directories during testing to avoid 
modifications
+- Each fixture should be self-contained with all necessary metadata
+- Keep fixtures minimal but realistic (small data sizes for fast tests)
+- Ensure consistent schema across all versions for compatibility testing
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
new file mode 100755
index 000000000000..526c08327932
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
@@ -0,0 +1,341 @@
+#!/bin/bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Script to generate Hudi MOR table fixtures for different versions
+# Used by TestUpgradeDowngradeFixtures for integration testing
+
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+FIXTURES_DIR="$SCRIPT_DIR/mor-tables"
+
+echo "Generating Hudi upgrade/downgrade test fixtures..."
+echo "Fixtures directory: $FIXTURES_DIR"
+
+# Parse command line arguments
+REQUESTED_VERSIONS=""
+HUDI_BUNDLE_PATH=""
+while [[ $# -gt 0 ]]; do
+    case $1 in
+        --version)
+            REQUESTED_VERSIONS="$2"
+            shift 2
+            ;;
+        --hudi-bundle-path)
+            HUDI_BUNDLE_PATH="$2"
+            shift 2
+            ;;
+        *)
+            echo "Unknown option: $1"
+            echo "Usage: $0 [--version <version_list>] [--hudi-bundle-path 
<path>]"
+            echo "  --version <version_list>          Comma-separated list of 
table versions to generate (e.g., 4,5,6)"
+            echo "  --hudi-bundle-path <path>         Path to locally built 
Hudi bundle JAR (required for version 9)"
+            echo ""
+            echo "Examples:"
+            echo "  $0                                           # Generate 
all versions (except 9)"
+            echo "  $0 --version 4,5                             # Generate 
versions 4 and 5"
+            echo "  $0 --version 9 --hudi-bundle-path 
/path/to/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar"
+            echo ""
+            echo "Note: Version 9 requires a locally built Hudi bundle from 
master branch"
+            exit 1
+            ;;
+    esac
+done
+
+# Convert comma-separated versions to array
+if [ -n "$REQUESTED_VERSIONS" ]; then
+    IFS=',' read -ra VERSION_ARRAY <<< "$REQUESTED_VERSIONS"
+    echo "INFO: Generating specific versions: ${VERSION_ARRAY[@]}"
+else
+    echo "INFO: Generating all fixture versions"
+fi
+
+# Function to check if a version should be generated
+should_generate_version() {
+    local version=$1
+    # If no specific versions requested, generate all
+    if [ -z "$REQUESTED_VERSIONS" ]; then
+        return 0
+    fi
+    # Check if version is in the requested array
+    for requested in "${VERSION_ARRAY[@]}"; do
+        if [ "$requested" = "$version" ]; then
+            return 0
+        fi
+    done
+    return 1
+}
+
+# Validate version 9 requirements
+if should_generate_version "9"; then
+    if [ -z "$HUDI_BUNDLE_PATH" ]; then
+        echo "ERROR: Version 9 requires --hudi-bundle-path argument"
+        echo ""
+        echo "To generate table version 9, you need to:"
+        echo "1. Build Hudi from master branch:"
+        echo "   cd <hudi-repo>"
+        echo "   mvn clean install -DskipTests -Dspark3.5 -Dscala-2.12 -pl 
packaging/hudi-spark-bundle -am"
+        echo ""
+        echo "2. Provide the path to the built bundle:"
+        echo "   $0 --version 9 --hudi-bundle-path 
<hudi-repo>/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar"
+        exit 1
+    fi
+    
+    if [ ! -f "$HUDI_BUNDLE_PATH" ]; then
+        echo "ERROR: Hudi bundle not found at: $HUDI_BUNDLE_PATH"
+        exit 1
+    fi
+    
+    echo "Using local Hudi bundle: $HUDI_BUNDLE_PATH"
+fi
+
+# Function to ensure Spark binary is available locally
+ensure_spark_binary() {
+    local spark_version=$1
+    
+    # Determine exact Spark version and tarball name
+    case "$spark_version" in
+        "3.2")
+            local spark_full_version="3.2.4"
+            local spark_tarball="spark-3.2.4-bin-hadoop3.2.tgz"
+            local extracted_dirname="spark-3.2.4-bin-hadoop3.2"
+            ;;
+        "3.3")
+            local spark_full_version="3.3.4"
+            local spark_tarball="spark-3.3.4-bin-hadoop3.tgz"
+            local extracted_dirname="spark-3.3.4-bin-hadoop3"
+            ;;
+        "3.4")
+            local spark_full_version="3.4.3"
+            local spark_tarball="spark-3.4.3-bin-hadoop3.tgz"
+            local extracted_dirname="spark-3.4.3-bin-hadoop3"
+            ;;
+        "3.5")
+            local spark_full_version="3.5.1"
+            local spark_tarball="spark-3.5.1-bin-hadoop3.tgz"
+            local extracted_dirname="spark-3.5.1-bin-hadoop3"
+            ;;
+        *)
+            echo "ERROR: Unsupported Spark version: $spark_version"
+            exit 1
+            ;;
+    esac
+    
+    local spark_dir="$SCRIPT_DIR/spark-versions/spark-${spark_full_version}"
+    local spark_bin="$spark_dir/bin/spark-shell"
+    
+    # Check if Spark binary already exists
+    if [ -f "$spark_bin" ]; then
+        echo "Spark $spark_full_version already available at $spark_dir" >&2
+        echo "$spark_dir"
+        return 0
+    fi
+    
+    echo "Downloading Spark $spark_full_version..." >&2
+    
+    local 
download_url="https://archive.apache.org/dist/spark/spark-${spark_full_version}/${spark_tarball}";
+    local temp_tarball="/tmp/${spark_tarball}"
+    
+    # Download Spark binary
+    if ! curl -L -o "$temp_tarball" "$download_url"; then
+        echo "ERROR: Failed to download Spark $spark_full_version" >&2
+        exit 1
+    fi
+    
+    # Extract to spark-versions directory
+    echo "Extracting Spark $spark_full_version..." >&2
+    mkdir -p "$SCRIPT_DIR/spark-versions"
+    
+    if ! tar -xzf "$temp_tarball" -C "$SCRIPT_DIR/spark-versions/"; then
+        echo "ERROR: Failed to extract Spark $spark_full_version" >&2
+        rm -f "$temp_tarball"
+        exit 1
+    fi
+    
+    # Rename extracted directory to our expected name
+    local extracted_dir="$SCRIPT_DIR/spark-versions/$extracted_dirname"
+    if [ -d "$extracted_dir" ]; then
+        mv "$extracted_dir" "$spark_dir"
+    fi
+    
+    # Clean up tarball
+    rm -f "$temp_tarball"
+    
+    # Add a small delay to ensure filesystem sync
+    echo "Waiting for filesystem sync..." >&2
+    sleep 1
+    
+    echo "Spark $spark_full_version ready at $spark_dir" >&2
+    echo "$spark_dir"
+}
+
+# Function to generate a fixture table for a specific Hudi version
+generate_fixture() {
+    local hudi_version=$1
+    local table_version=$2
+    local fixture_name=$3
+    local spark_version=$4
+    local scala_version=$5
+    
+    echo "📊 Generating fixture: $fixture_name (Hudi $hudi_version, Table 
Version $table_version)"
+    echo "   Using Spark $spark_version with Scala $scala_version"
+    
+    local fixture_path="$FIXTURES_DIR/$fixture_name"
+    
+    # Clean existing fixture directory to prevent table type conflicts
+    if [ -d "$fixture_path" ]; then
+        echo "Cleaning existing fixture directory: $fixture_path"
+        rm -rf "$fixture_path"
+    fi
+    
+    mkdir -p "$fixture_path"
+    
+    # Ensure Spark binary is available
+    local spark_home=$(ensure_spark_binary "$spark_version")
+    echo "DEBUG: spark_home=[$spark_home]"
+    
+    # Validate Spark installation
+    if [ ! -f "$spark_home/bin/spark-shell" ]; then
+        echo "ERROR: spark-shell not found at $spark_home/bin/spark-shell"
+        echo "   Directory contents:"
+        ls -la "$spark_home/bin/" || echo "   Directory does not exist"
+        exit 1
+    fi
+    echo "Validated spark-shell exists at: $spark_home/bin/spark-shell"
+    
+    # Prepare Scala script from template
+    local table_name="${fixture_name}_table"
+    local temp_script="/tmp/generate_${fixture_name}.scala"
+    
+    # Copy template and substitute variables
+    cp "$SCRIPT_DIR/scala-templates/generate-fixture.scala" "$temp_script"
+    sed -i.bak \
+        -e "s/\${FIXTURE_NAME}/$fixture_name/g" \
+        -e "s/\${TABLE_NAME}/$table_name/g" \
+        -e "s|\${BASE_PATH}|$fixture_path|g" \
+        "$temp_script"
+    rm -f "${temp_script}.bak"
+    
+    # Run Spark shell directly to generate the fixture
+    echo "Running Spark shell directly from: $spark_home"
+    
+    # Create temporary directory for Ivy cache to avoid permission issues
+    local ivy_cache_dir="/tmp/ivy-cache-${fixture_name}"
+    mkdir -p "$ivy_cache_dir"
+    
+    # Handle version 9 specially (use local JAR instead of Maven)
+    if [ "$table_version" = "9" ]; then
+        echo "Using local Hudi bundle: $HUDI_BUNDLE_PATH"
+        "$spark_home/bin/spark-shell" \
+            --conf 
'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+            --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
+            --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+            --conf "spark.jars.ivy=$ivy_cache_dir" \
+            --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+            --jars "$HUDI_BUNDLE_PATH" -i "$temp_script"
+    else
+        # Use Maven packages for official releases
+        local 
hudi_bundle="org.apache.hudi:hudi-spark${spark_version}-bundle_${scala_version}:${hudi_version}"
+        echo "Using Hudi bundle: $hudi_bundle"
+        "$spark_home/bin/spark-shell" \
+            --conf 
'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+            --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
+            --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+            --conf "spark.jars.ivy=$ivy_cache_dir" \
+            --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+            --packages "$hudi_bundle" -i "$temp_script"
+    fi
+    
+    # Clean up ivy cache directory
+    rm -rf "$ivy_cache_dir"
+    
+    # Clean up temp script
+    rm "/tmp/generate_${fixture_name}.scala"
+    
+    echo "Fixture $fixture_name generated successfully"
+}
+
+# Generate fixtures for each version using appropriate Spark versions
+echo "Generating fixtures for all supported versions..."
+
+# Based on Hudi-Spark compatibility matrix:
+# 1.0.x    -> 3.5.x (default)
+# 0.15.x   -> 3.5.x (default)
+# 0.14.x   -> 3.4.x (default)
+# 0.13.x   -> 3.3.x (default)
+# 0.12.x   -> 3.3.x (default)
+# 0.11.x   -> 3.2.x (default)
+
+echo "Hudi Version -> Spark Version -> Scala Version mapping:"
+
+# Hudi 0.11.1 (Table Version 4) -> Spark 3.2.x (default) -> Scala 2.12
+if should_generate_version "4"; then
+    echo "   0.11.1 -> Spark 3.2 -> Scala 2.12"
+    generate_fixture "0.11.1" "4" "hudi-v4-table" "3.2" "2.12"
+fi
+
+# Hudi 0.12.2 (Table Version 5) -> Spark 3.3.x (default) -> Scala 2.12  
+if should_generate_version "5"; then
+    echo "   0.12.2 -> Spark 3.3 -> Scala 2.12"
+    generate_fixture "0.12.2" "5" "hudi-v5-table" "3.3" "2.12"
+fi
+
+# Hudi 0.14.0 (Table Version 6) -> Spark 3.4.x (default) -> Scala 2.12
+if should_generate_version "6"; then
+    echo "   0.14.0 -> Spark 3.4 -> Scala 2.12"
+    generate_fixture "0.14.0" "6" "hudi-v6-table" "3.4" "2.12"
+fi
+
+# Hudi 1.0.2 (Table Version 8) -> Spark 3.5.x (default) -> Scala 2.12
+if should_generate_version "8"; then
+    echo "   1.0.2 -> Spark 3.5 -> Scala 2.12"
+    generate_fixture "1.0.2" "8" "hudi-v8-table" "3.5" "2.12"
+fi
+
+# Hudi 1.1.0 (Table Version 9) -> Spark 3.5.x (default) -> Scala 2.12 
(requires local bundle)
+if should_generate_version "9"; then
+    echo "   1.1.0 -> Spark 3.5 -> Scala 2.12 (using local bundle)"
+    generate_fixture "1.1.0" "9" "hudi-v9-table" "3.5" "2.12"
+fi
+
+
+echo ""
+echo "Fixture generation completed!"
+
+# Compress fixture tables to save space
+echo ""
+echo "Compressing fixture tables..."
+for fixture_dir in "$FIXTURES_DIR"/hudi-v*-table; do
+    if [ -d "$fixture_dir" ]; then
+        fixture_name=$(basename "$fixture_dir")
+        echo "Compressing $fixture_name..."
+        (cd "$FIXTURES_DIR" && zip -r -q -X "${fixture_name}.zip" 
"$fixture_name")
+        if [ $? -eq 0 ]; then
+            rm -rf "$fixture_dir"
+            echo "Created ${fixture_name}.zip"
+        else
+            echo "ERROR: Failed to compress $fixture_name"
+            exit 1
+        fi
+    fi
+done
+
+echo ""
+echo "Compression completed!"
+echo "Generated compressed fixtures:"
+find "$FIXTURES_DIR" -name "hudi-v*-table.zip" | sort
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
new file mode 100644
index 000000000000..154efa99c8c1
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
new file mode 100644
index 000000000000..0e6c5833b69f
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
new file mode 100644
index 000000000000..b571f1efde12
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
new file mode 100644
index 000000000000..d93dca2ebbe8
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
new file mode 100644
index 000000000000..33dc309c3443
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
new file mode 100644
index 000000000000..fbefed9e05df
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.sql.SaveMode
+import org.apache.hudi.DataSourceWriteOptions._
+import spark.implicits._
+
+val tableName = "${TABLE_NAME}"
+val basePath = "${BASE_PATH}"
+
+// Create simple test data with consistent schema
+val testData = Seq(
+  ("id1", "Alice", 1000L, "2023-01-01"),
+  ("id2", "Bob", 2000L, "2023-01-01"),  
+  ("id3", "Charlie", 3000L, "2023-01-02"),
+  ("id4", "David", 4000L, "2023-01-02"),
+  ("id5", "Eve", 5000L, "2023-01-03")
+)
+
+val df = testData.toDF("id", "name", "ts", "partition")
+
+// Write initial batch (creates base files)
+df.write.format("hudi").
+  option(PRECOMBINE_FIELD.key, "ts").
+  option(RECORDKEY_FIELD.key, "id").
+  option(PARTITIONPATH_FIELD.key, "partition").
+  option("hoodie.table.name", tableName).
+  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+  option("hoodie.datasource.write.operation", "insert").
+  option("hoodie.index.bloom.num_entries", "20").
+  option("hoodie.bloom.index.false.positive.rate", "0.1").
+  option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+  option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file 
threshold
+  option("hoodie.parquet.compression.codec", "snappy").
+  mode(SaveMode.Overwrite).
+  save(basePath)
+
+println("Initial batch written")
+
+// Write update batch (creates log files)
+val updateData = Seq(
+  ("id1", "Alice_Updated", 1001L, "2023-01-01"),
+  ("id2", "Bob_Updated", 2001L, "2023-01-01"),
+  ("id6", "Frank", 6000L, "2023-01-03")
+)
+
+val updateDf = updateData.toDF("id", "name", "ts", "partition")
+
+updateDf.write.format("hudi").
+  option(PRECOMBINE_FIELD.key, "ts").
+  option(RECORDKEY_FIELD.key, "id").
+  option(PARTITIONPATH_FIELD.key, "partition").
+  option("hoodie.table.name", tableName).
+  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+  option("hoodie.datasource.write.operation", "upsert").
+  option("hoodie.index.bloom.num_entries", "20").
+  option("hoodie.bloom.index.false.positive.rate", "0.1").
+  option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+  option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file 
threshold
+  option("hoodie.parquet.compression.codec", "snappy").
+  mode(SaveMode.Append).
+  save(basePath)
+
+println("Update batch written")
+
+// Create one more insert to have multiple commits
+val insertData = Seq(
+  ("id7", "Grace", 7000L, "2023-01-04"),
+  ("id8", "Henry", 8000L, "2023-01-04")
+)
+
+val insertDf = insertData.toDF("id", "name", "ts", "partition")
+
+insertDf.write.format("hudi").
+  option(PRECOMBINE_FIELD.key, "ts").
+  option(RECORDKEY_FIELD.key, "id").
+  option(PARTITIONPATH_FIELD.key, "partition").
+  option("hoodie.table.name", tableName).
+  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+  option("hoodie.datasource.write.operation", "insert").
+  option("hoodie.index.bloom.num_entries", "20").
+  option("hoodie.bloom.index.false.positive.rate", "0.1").
+  option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+  option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file 
threshold
+  option("hoodie.parquet.compression.codec", "snappy").
+  mode(SaveMode.Append).
+  save(basePath)
+
+println("Additional insert written")
+println(s"Fixture ${FIXTURE_NAME} generated successfully!")
+
+System.exit(0)

Reply via email to