This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b22e8d9558ea [HUDI-9684] Add Upgrade/Downgrade Test Fixtures Framework
(#13669)
b22e8d9558ea is described below
commit b22e8d9558ea62a439eb04960d721263728891be
Author: Rahil C <[email protected]>
AuthorDate: Mon Aug 4 18:39:14 2025 -0700
[HUDI-9684] Add Upgrade/Downgrade Test Fixtures Framework (#13669)
---
.gitignore | 3 +
...ngrade.java => TestUpgradeDowngradeLegacy.java} | 3 +-
.../table/read/TestHoodieFileGroupReaderBase.java | 27 +-
.../hudi/common/testutils/HoodieTestUtils.java | 50 ++
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 586 +++++++++++++++++++++
.../resources/upgrade-downgrade-fixtures/README.md | 192 +++++++
.../generate-fixtures.sh | 341 ++++++++++++
.../mor-tables/hudi-v4-table.zip | Bin 0 -> 55017 bytes
.../mor-tables/hudi-v5-table.zip | Bin 0 -> 51947 bytes
.../mor-tables/hudi-v6-table.zip | Bin 0 -> 55824 bytes
.../mor-tables/hudi-v8-table.zip | Bin 0 -> 122258 bytes
.../mor-tables/hudi-v9-table.zip | Bin 0 -> 123797 bytes
.../scala-templates/generate-fixture.scala | 106 ++++
13 files changed, 1281 insertions(+), 27 deletions(-)
diff --git a/.gitignore b/.gitignore
index daf30008a5de..05fb43d452aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,6 @@ dependency-reduced-pom.xml
hudi-integ-test/compose_env
node_modules
package-lock.json
+
+# Spark Binaries used for testing
+hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/spark-versions/
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
similarity index 99%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
rename to
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
index a54d09caee06..245af8e4b3ca 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeLegacy.java
@@ -110,8 +110,9 @@ import static org.mockito.Mockito.when;
/**
* Unit tests {@link UpgradeDowngrade}.
+ * New tests for upgrade and downgrade operations should be added to
TestUpgradeDowngrade using the test fixtures.
*/
-public class TestUpgradeDowngrade extends HoodieClientTestBase {
+public class TestUpgradeDowngradeLegacy extends HoodieClientTestBase {
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with
deletePartialMarkerFiles={0} and TableType = {1}";
private static final String TEST_NAME_WITH_DOWNGRADE_PARAMS = "[{index}]
Test with deletePartialMarkerFiles={0} and TableType = {1} and "
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index e681ba8a8691..147b677ee72a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -71,8 +71,6 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
-import java.io.BufferedOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
@@ -90,8 +88,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
@@ -540,7 +536,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
@Test
public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
Path zipOutput = Paths.get(new URI(getBasePath()));
- extract(zipOutput);
+
HoodieTestUtils.extractZipToDirectory("file-group-reader/bootstrap_data.zip",
zipOutput, getClass());
ObjectMapper objectMapper = new ObjectMapper();
Path basePath = zipOutput.resolve("bootstrap_data");
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = new
ArrayList<>();
@@ -954,25 +950,4 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
return partitionPath;
}
- private void extract(Path target) throws IOException {
- try (ZipInputStream zip = new
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
{
- ZipEntry entry;
-
- while ((entry = zip.getNextEntry()) != null) {
- File file = target.resolve(entry.getName()).toFile();
- if (entry.isDirectory()) {
- file.mkdirs();
- continue;
- }
- byte[] buffer = new byte[10000];
- file.getParentFile().mkdirs();
- try (BufferedOutputStream out = new
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
- int count;
- while ((count = zip.read(buffer)) != -1) {
- out.write(buffer, 0, count);
- }
- }
- }
- }
- }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 6bd86f60a95f..9ee9f1ba8be6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -63,10 +63,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.jupiter.api.Assumptions;
+import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -444,6 +451,49 @@ public class HoodieTestUtils {
return deleteFileList;
}
+ /**
+ * Extracts a ZIP file from resources to a target directory.
+ *
+ * @param resourcePath the path to the ZIP resource (relative to classpath)
+ * @param targetDirectory the target directory to extract files to
+ * @param resourceClass the class to use for resource loading
+ * @throws IOException if extraction fails
+ */
+ public static void extractZipToDirectory(String resourcePath, Path
targetDirectory, Class<?> resourceClass) throws IOException {
+ InputStream resourceStream =
resourceClass.getClassLoader().getResourceAsStream(resourcePath);
+ if (resourceStream == null) {
+ // Fallback to getResourceAsStream if
getClassLoader().getResourceAsStream() fails
+ resourceStream = resourceClass.getResourceAsStream(resourcePath);
+ }
+
+ if (resourceStream == null) {
+ throw new IOException("Resource not found at: " + resourcePath);
+ }
+
+ try (ZipInputStream zip = new ZipInputStream(resourceStream)) {
+ ZipEntry entry;
+ while ((entry = zip.getNextEntry()) != null) {
+ File file = targetDirectory.resolve(entry.getName()).toFile();
+ if (entry.isDirectory()) {
+ file.mkdirs();
+ continue;
+ }
+
+ // Create parent directories if they don't exist
+ file.getParentFile().mkdirs();
+
+ // Extract file content
+ byte[] buffer = new byte[10000];
+ try (BufferedOutputStream out = new
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
+ int count;
+ while ((count = zip.read(buffer)) != -1) {
+ out.write(buffer, 0, count);
+ }
+ }
+ }
+ }
+ }
+
public static void validateTableConfig(HoodieStorage storage,
String basePath,
Map<String, String> expectedConfigs,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
new file mode 100644
index 000000000000..467ef97f45fc
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for upgrade/downgrade operations using pre-created fixture tables
+ * from different Hudi releases.
+ */
+public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestUpgradeDowngrade.class);
+ private static final String FIXTURES_BASE_PATH =
"/upgrade-downgrade-fixtures/mor-tables/";
+
+ @TempDir
+ java.nio.file.Path tempDir;
+
+ private HoodieTableMetaClient metaClient;
+
+ @ParameterizedTest
+ @MethodSource("upgradeDowngradeVersionPairs")
+ public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion,
HoodieTableVersion toVersion) throws Exception {
+ boolean isUpgrade = fromVersion.lesserThan(toVersion);
+ String operation = isUpgrade ? "upgrade" : "downgrade";
+ LOG.info("Testing {} from version {} to {}", operation, fromVersion,
toVersion);
+
+ HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion);
+ assertEquals(fromVersion,
originalMetaClient.getTableConfig().getTableVersion(),
+ "Fixture table should be at expected version");
+
+ HoodieWriteConfig config = createWriteConfig(originalMetaClient, true);
+
+ int initialPendingCommits =
originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
+ int initialCompletedCommits =
originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
+
+ Dataset<Row> originalData = readTableData(originalMetaClient, "before " +
operation);
+
+ new UpgradeDowngrade(originalMetaClient, config, context(),
SparkUpgradeDowngradeHelper.getInstance())
+ .run(toVersion, null);
+
+ HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf().newInstance())
+ .setBasePath(originalMetaClient.getBasePath())
+ .build();
+
+ assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion);
+ validateVersionSpecificProperties(resultMetaClient, fromVersion,
toVersion);
+ validateDataConsistency(originalData, resultMetaClient, "after " +
operation);
+
+ // Validate pending commits based on whether this transition performs
rollback operations
+ int finalPendingCommits =
resultMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
+ if (isRollbackTransition(fromVersion, toVersion)) {
+ // Handlers that call rollbackFailedWritesAndCompact() clear all pending
commits
+ assertEquals(0, finalPendingCommits,
+ "Pending commits should be cleared to 0 after " + operation);
+ } else {
+ // Other handlers may clean up some pending commits but don't
necessarily clear all
+ assertTrue(finalPendingCommits <= initialPendingCommits,
+ "Pending commits should be cleaned up or reduced after " +
operation);
+ }
+
+ int finalCompletedCommits =
resultMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
+ assertTrue(finalCompletedCommits >= initialCompletedCommits,
+ "Completed commits should be preserved or increased after " +
operation);
+
+ LOG.info("Successfully completed {} test for version {} -> {}", operation,
fromVersion, toVersion);
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableVersions")
+ public void testAutoUpgradeDisabled(HoodieTableVersion originalVersion)
throws Exception {
+ LOG.info("Testing auto-upgrade disabled for version {}", originalVersion);
+
+ HoodieTableMetaClient originalMetaClient =
loadFixtureTable(originalVersion);
+
+ Option<HoodieTableVersion> targetVersionOpt =
getNextVersion(originalVersion);
+ if (!targetVersionOpt.isPresent()) {
+ LOG.info("Skipping auto-upgrade test for version {} (no higher version
available)", originalVersion);
+ return;
+ }
+ HoodieTableVersion targetVersion = targetVersionOpt.get();
+
+ HoodieWriteConfig config = createWriteConfig(originalMetaClient, false);
+
+ // Attempt upgrade with auto-upgrade disabled
+ new UpgradeDowngrade(originalMetaClient, config, context(),
SparkUpgradeDowngradeHelper.getInstance())
+ .run(targetVersion, null);
+
+ // Create fresh meta client to validate that version remained unchanged
+ HoodieTableMetaClient unchangedMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf().newInstance())
+ .setBasePath(originalMetaClient.getBasePath())
+ .build();
+ assertEquals(originalVersion,
unchangedMetaClient.getTableConfig().getTableVersion(),
+ "Table version should remain unchanged when auto-upgrade is disabled");
+ validateVersionSpecificProperties(unchangedMetaClient, originalVersion);
+ performDataValidationOnTable(unchangedMetaClient, "after auto-upgrade
disabled test");
+
+ LOG.info("Auto-upgrade disabled test passed for version {}",
originalVersion);
+ }
+
+ /**
+ * Load a fixture table from resources and copy it to a temporary location
for testing.
+ */
+ private HoodieTableMetaClient loadFixtureTable(HoodieTableVersion version)
throws IOException {
+ String fixtureName = getFixtureName(version);
+ String resourcePath = FIXTURES_BASE_PATH + fixtureName;
+
+ LOG.info("Loading fixture from resource path: {}", resourcePath);
+ HoodieTestUtils.extractZipToDirectory(resourcePath, tempDir, getClass());
+
+ String tableName = fixtureName.replace(".zip", "");
+ String tablePath = tempDir.resolve(tableName).toString();
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf().newInstance())
+ .setBasePath(tablePath)
+ .build();
+
+ LOG.info("Loaded fixture table {} at version {}", fixtureName,
metaClient.getTableConfig().getTableVersion());
+ return metaClient;
+ }
+
+ private HoodieWriteConfig createWriteConfig(HoodieTableMetaClient
metaClient, boolean autoUpgrade) {
+ Properties props = new Properties();
+ HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+ .withPath(metaClient.getBasePath().toString())
+ .withAutoUpgradeVersion(autoUpgrade)
+ .withProps(props);
+
+ // For validation operations, keep timeline server disabled for simplicity
+ if (!autoUpgrade) {
+ builder.withEmbeddedTimelineServerEnabled(false);
+ }
+
+ return builder.build();
+ }
+
+ private Option<HoodieTableVersion> getNextVersion(HoodieTableVersion
current) {
+ switch (current) {
+ case FOUR:
+ return Option.of(HoodieTableVersion.FIVE);
+ case FIVE:
+ return Option.of(HoodieTableVersion.SIX);
+ case SIX:
+ // even though there is a table version 7, this is not an official
release and serves as a bridge
+ // so the next version should be 8
+ return Option.of(HoodieTableVersion.EIGHT);
+ case EIGHT:
+ return Option.of(HoodieTableVersion.NINE);
+ case NINE:
+ default:
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Get fixture zip file name for a given table version.
+ */
+ private String getFixtureName(HoodieTableVersion version) {
+ switch (version) {
+ case FOUR:
+ return "hudi-v4-table.zip";
+ case FIVE:
+ return "hudi-v5-table.zip";
+ case SIX:
+ return "hudi-v6-table.zip";
+ case EIGHT:
+ return "hudi-v8-table.zip";
+ case NINE:
+ return "hudi-v9-table.zip";
+ default:
+ throw new IllegalArgumentException("Unsupported fixture version: " +
version);
+ }
+ }
+
+ private static Stream<Arguments> tableVersions() {
+ return Stream.of(
+ Arguments.of(HoodieTableVersion.FOUR), // Hudi 0.11.1
+ Arguments.of(HoodieTableVersion.FIVE), // Hudi 0.12.2
+ Arguments.of(HoodieTableVersion.SIX), // Hudi 0.14
+ Arguments.of(HoodieTableVersion.EIGHT), // Hudi 1.0.2
+ Arguments.of(HoodieTableVersion.NINE) // Hudi 1.1
+ );
+ }
+
+ private static Stream<Arguments> upgradeDowngradeVersionPairs() {
+ return Stream.of(
+ // Upgrade test cases
+ Arguments.of(HoodieTableVersion.FOUR, HoodieTableVersion.FIVE), //
V4 -> V5
+ Arguments.of(HoodieTableVersion.FIVE, HoodieTableVersion.SIX), //
V5 -> V6
+ Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT), //
V6 -> V8
+ Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE), //
V8 -> V9
+
+ // Downgrade test cases
+ Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT), //
V9 -> V8
+ Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX), //
V8 -> V6
+ Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.FIVE), //
V6 -> V5
+ Arguments.of(HoodieTableVersion.FIVE, HoodieTableVersion.FOUR) //
V5 -> V4
+ );
+ }
+
+ /**
+ * Assert table version on both data table and metadata table (if exists).
+ */
+ private void assertTableVersionOnDataAndMetadataTable(
+ HoodieTableMetaClient metaClient, HoodieTableVersion expectedVersion)
throws IOException {
+ assertTableVersion(metaClient, expectedVersion);
+ if (expectedVersion.greaterThanOrEquals(HoodieTableVersion.FOUR)) {
+ StoragePath metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+ if (metaClient.getStorage().exists(metadataTablePath)) {
+ LOG.info("Verifying metadata table version at: {}", metadataTablePath);
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
+ assertTableVersion(mdtMetaClient, expectedVersion);
+ } else {
+ LOG.info("Metadata table does not exist at: {}", metadataTablePath);
+ }
+ }
+ }
+
+ private void assertTableVersion(
+ HoodieTableMetaClient metaClient, HoodieTableVersion expectedVersion) {
+ assertEquals(expectedVersion,
+ metaClient.getTableConfig().getTableVersion());
+ }
+
+ /**
+ * Validate version-specific properties after upgrade/downgrade operations.
+ */
+ private void validateVersionSpecificProperties(
+ HoodieTableMetaClient metaClient, HoodieTableVersion fromVersion,
HoodieTableVersion toVersion) throws IOException {
+ LOG.info("Validating version-specific properties: {} -> {}", fromVersion,
toVersion);
+
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+
+ // Validate properties for target version
+ switch (toVersion) {
+ case FOUR:
+ validateVersion4Properties(metaClient, tableConfig);
+ break;
+ case FIVE:
+ validateVersion5Properties(metaClient, tableConfig);
+ break;
+ case SIX:
+ validateVersion6Properties(metaClient);
+ break;
+ case EIGHT:
+ validateVersion8Properties(tableConfig);
+ break;
+ case NINE:
+ validateVersion9Properties(metaClient, tableConfig);
+ break;
+ default:
+ LOG.warn("No specific property validation for version {}", toVersion);
+ }
+ }
+
+ /**
+ * Validate version-specific properties for a single table version.
+ */
+ private void validateVersionSpecificProperties(
+ HoodieTableMetaClient metaClient, HoodieTableVersion version) throws
IOException {
+ LOG.info("Validating version-specific properties for version {}", version);
+
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+
+ // Validate properties for the version
+ switch (version) {
+ case FOUR:
+ validateVersion4Properties(metaClient, tableConfig);
+ break;
+ case FIVE:
+ validateVersion5Properties(metaClient, tableConfig);
+ break;
+ case SIX:
+ validateVersion6Properties(metaClient);
+ break;
+ case EIGHT:
+ validateVersion8Properties(tableConfig);
+ break;
+ case NINE:
+ validateVersion9Properties(metaClient, tableConfig);
+ break;
+ default:
+ LOG.warn("No specific property validation for version {}", version);
+ }
+ }
+
+ private void validateVersion4Properties(HoodieTableMetaClient metaClient,
HoodieTableConfig tableConfig) throws IOException {
+ assertTrue(tableConfig.contains(HoodieTableConfig.TABLE_CHECKSUM),
+ "TABLE_CHECKSUM should be set for V4");
+ String actualChecksum =
tableConfig.getString(HoodieTableConfig.TABLE_CHECKSUM);
+ assertNotNull(actualChecksum, "TABLE_CHECKSUM should not be null");
+
+ // Validate that the checksum is valid by comparing with computed checksum
+ String expectedChecksum =
String.valueOf(HoodieTableConfig.generateChecksum(tableConfig.getProps()));
+ assertEquals(expectedChecksum, actualChecksum,
+ "TABLE_CHECKSUM should match computed checksum");
+
+ assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_1,
tableConfig.getTimelineLayoutVersion().get());
+
+ // TABLE_METADATA_PARTITIONS should be properly set if present
+ // Note: This is optional based on whether metadata table was enabled
during upgrade
+ // After downgrade operations, metadata table may be deleted, so we check
if it exists first
+ if (tableConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
+ if (isMetadataTablePresent(metaClient)) {
+ // Metadata table exists - enforce strict validation
+ String metadataPartitions =
tableConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS);
+ assertTrue(metadataPartitions.contains("files"),
+ "TABLE_METADATA_PARTITIONS should contain 'files' partition when
metadata table exists");
+ } else {
+ // Metadata table doesn't exist (likely after downgrade) - validation
not applicable
+ LOG.info("Skipping TABLE_METADATA_PARTITIONS 'files' validation -
metadata table does not exist (likely after downgrade operation)");
+ }
+ }
+ }
+
+ private boolean isMetadataTablePresent(HoodieTableMetaClient metaClient)
throws IOException {
+ StoragePath metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
+ return metaClient.getStorage().exists(metadataTablePath);
+ }
+
+ /**
+ * Determine if a version transition performs rollback operations that clear
all pending commits.
+ * These handlers call rollbackFailedWritesAndCompact() which clears pending
commits to 0.
+ */
+ private boolean isRollbackTransition(HoodieTableVersion fromVersion,
HoodieTableVersion toVersion) {
+ // Upgrade handlers that perform rollbacks
+ if (fromVersion == HoodieTableVersion.SEVEN && toVersion ==
HoodieTableVersion.EIGHT) {
+ return true; // SevenToEightUpgradeHandler
+ }
+ if (fromVersion == HoodieTableVersion.EIGHT && toVersion ==
HoodieTableVersion.NINE) {
+ return true; // EightToNineUpgradeHandler
+ }
+
+ // Downgrade handlers that perform rollbacks
+ if (fromVersion == HoodieTableVersion.SIX && toVersion ==
HoodieTableVersion.FIVE) {
+ return true; // SixToFiveDowngradeHandler
+ }
+ if (fromVersion == HoodieTableVersion.EIGHT && toVersion ==
HoodieTableVersion.SEVEN) {
+ return true; // EightToSevenDowngradeHandler
+ }
+ if (fromVersion == HoodieTableVersion.NINE && toVersion ==
HoodieTableVersion.EIGHT) {
+ return true; // NineToEightDowngradeHandler
+ }
+
+ return false; // All other transitions don't perform rollbacks
+ }
+
+ private void validateVersion5Properties(HoodieTableMetaClient metaClient,
HoodieTableConfig tableConfig) throws IOException {
+
+ validateVersion4Properties(metaClient, tableConfig);
+ // Version 5 upgrade validates that no deprecated default partition paths
exist
+ // The upgrade handler checks for DEPRECATED_DEFAULT_PARTITION_PATH
("default")
+ // and requires migration to DEFAULT_PARTITION_PATH
("__HIVE_DEFAULT_PARTITION__")
+
+ // If table is partitioned, validate partition path migration
+ if (tableConfig.isTablePartitioned()) {
+ LOG.info("Validating V5 partition path migration for partitioned table");
+
+ // Check hive-style partitioning configuration
+ boolean hiveStylePartitioningEnable =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+ LOG.info("Hive-style partitioning enabled: {}",
hiveStylePartitioningEnable);
+
+ // Validate partition field configuration exists
+ assertTrue(tableConfig.getPartitionFields().isPresent(),
+ "Partition fields should be present for partitioned table in V5");
+ } else {
+ LOG.info("Non-partitioned table - skipping partition path validation for
V5");
+ }
+ }
+
+ private void validateVersion6Properties(HoodieTableMetaClient metaClient)
throws IOException {
+ // Version 6 upgrade deletes compaction requested files from .aux folder
(HUDI-6040)
+ // Validate that no REQUESTED compaction files remain in auxiliary folder
+ validateVersion5Properties(metaClient, metaClient.getTableConfig());
+
+ StoragePath auxPath = new StoragePath(metaClient.getMetaAuxiliaryPath());
+
+ if (!metaClient.getStorage().exists(auxPath)) {
+ // Auxiliary folder doesn't exist - this is valid, nothing to clean up
+ LOG.info("V6 validation passed: Auxiliary folder does not exist");
+ return;
+ }
+
+ // Auxiliary folder exists - validate that REQUESTED compaction files were
cleaned up
+ LOG.info("V6 validation: Checking auxiliary folder cleanup at: {}",
auxPath);
+
+ // Get pending compaction timeline with REQUESTED state (same as upgrade
handler)
+ HoodieTimeline compactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+
+ InstantFileNameGenerator factory =
metaClient.getInstantFileNameGenerator();
+
+ // Validate that none of the REQUESTED compaction files exist in auxiliary
folder
+ compactionTimeline.getInstantsAsStream().forEach(instant -> {
+ StoragePath compactionFile = new
StoragePath(metaClient.getMetaAuxiliaryPath(), factory.getFileName(instant));
+ try {
+ if (metaClient.getStorage().exists(compactionFile)) {
+ throw new AssertionError("V6 validation failed: REQUESTED compaction
file should have been cleaned up but still exists: " + compactionFile);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to check existence of compaction
file: " + compactionFile, e);
+ }
+ });
+
+ LOG.info("V6 validation passed: {} REQUESTED compaction instants verified
to be cleaned up from auxiliary folder",
+ compactionTimeline.countInstants());
+ }
+
+ private void validateVersion8Properties(HoodieTableConfig tableConfig) {
+ Option<TimelineLayoutVersion> layoutVersion =
tableConfig.getTimelineLayoutVersion();
+ assertTrue(layoutVersion.isPresent(), "Timeline layout version should be
present for V8+");
+ assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_2, layoutVersion.get(),
+ "Timeline layout should be V2 for V8+");
+
+ assertTrue(tableConfig.contains(HoodieTableConfig.TIMELINE_PATH),
+ "Timeline path should be set for V8");
+ assertEquals(HoodieTableConfig.TIMELINE_PATH.defaultValue(),
+ tableConfig.getString(HoodieTableConfig.TIMELINE_PATH),
+ "Timeline path should have default value");
+
+ assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE),
+ "Record merge mode should be set for V8");
+ RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
+ assertNotNull(mergeMode, "Merge mode should not be null");
+
+
assertTrue(tableConfig.contains(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID),
+ "Record merge strategy ID should be set for V8");
+
+ assertTrue(tableConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME),
+ "Payload class should be set for V8");
+
+ assertTrue(tableConfig.contains(HoodieTableConfig.INITIAL_VERSION),
+ "Initial version should be set for V8");
+
+ if (tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) {
+ assertTrue(tableConfig.contains(HoodieTableConfig.KEY_GENERATOR_TYPE),
+ "Key generator type should be set when key generator class is
present");
+ }
+ }
+
+ private void validateVersion9Properties(HoodieTableMetaClient metaClient,
HoodieTableConfig tableConfig) {
+ validateVersion8Properties(tableConfig);
+
+ // Check if index metadata exists and has proper version information
+ Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
+ if (indexMetadata.isPresent()) {
+ indexMetadata.get().getIndexDefinitions().forEach((indexName, indexDef)
-> {
+ assertNotNull(indexDef.getVersion(),
+ "Index " + indexName + " should have version information in V9");
+ });
+ }
+ }
+
+ /**
+ * Read table data for validation purposes.
+ */
+ private Dataset<Row> readTableData(HoodieTableMetaClient metaClient, String
stage) {
+ LOG.info("Reading table data {}", stage);
+
+ try {
+ String basePath = metaClient.getBasePath().toString();
+ Dataset<Row> tableData = sqlContext().read()
+ .format("hudi")
+ .load(basePath);
+
+ assertNotNull(tableData, "Table read should not return null " + stage);
+
+ // Force execution to ensure data is read immediately (not lazily)
+ List<Row> rows = tableData.collectAsList();
+ long rowCount = rows.size();
+ assertTrue(rowCount >= 0, "Row count should be non-negative " + stage);
+
+ // Convert collected rows back to Dataset for use in validation
+ Dataset<Row> materializedData = sqlContext().createDataFrame(rows,
tableData.schema());
+
+ LOG.info("Successfully read and materialized table data {} ({} rows)",
stage, rowCount);
+ return materializedData;
+ } catch (Exception e) {
+ LOG.error("Failed to read table data {} from: {} (version: {})",
+ stage, metaClient.getBasePath(),
metaClient.getTableConfig().getTableVersion(), e);
+ throw new RuntimeException("Failed to read table data " + stage, e);
+ }
+ }
+
+ /**
+ * Validate data consistency between original data and data after
upgrade/downgrade.
+ * This ensures that upgrade/downgrade operations preserve data integrity.
+ */
+ private void validateDataConsistency(Dataset<Row> originalData,
HoodieTableMetaClient metaClient, String stage) {
+ LOG.info("Validating data consistency {}", stage);
+
+ try {
+ Dataset<Row> currentData = readTableData(metaClient, stage);
+
+ // Exclude Hudi metadata columns
+ Set<String> hoodieMetadataColumns = new HashSet<>(Arrays.asList(
+ "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key",
+ "_hoodie_partition_path", "_hoodie_file_name"));
+
+ Set<String> columnsToValidate = Arrays.stream(originalData.columns())
+ .filter(col -> !hoodieMetadataColumns.contains(col))
+ .collect(Collectors.toSet());
+
+ if (columnsToValidate.isEmpty()) {
+ LOG.info("Skipping data consistency validation {} (no business columns
to validate)", stage);
+ return;
+ }
+
+ LOG.info("Validating data columns: {}", columnsToValidate);
+ boolean dataConsistent = areDataframesEqual(originalData, currentData,
columnsToValidate);
+ assertTrue(dataConsistent, " data should be consistent between original
and " + stage + " states");
+
+ LOG.info("Data consistency validation passed {}", stage);
+ } catch (Exception e) {
+ throw new RuntimeException("Data consistency validation failed " +
stage, e);
+ }
+ }
+
+ private void performDataValidationOnTable(HoodieTableMetaClient metaClient,
String stage) {
+ LOG.info("Performing data validation on table {}", stage);
+
+ try {
+ Dataset<Row> tableData = readTableData(metaClient, stage);
+ LOG.info("Data validation passed {} (table accessible, {} rows)", stage,
tableData.count());
+ } catch (Exception e) {
+ throw new RuntimeException("Data validation failed " + stage, e);
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
new file mode 100644
index 000000000000..3ccafcd8ff65
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/README.md
@@ -0,0 +1,192 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Hudi Upgrade/Downgrade Test Fixtures
+
+This directory contains pre-created MOR Hudi tables from different releases
used for testing upgrade/downgrade functionality.
+
+## Fixture Tables
+
+| Directory | Hudi Version | Table Version |
+|-----------|--------------|---------------|
+| `hudi-v4-table/` | 0.11.1 | 4 |
+| `hudi-v5-table/` | 0.12.2 | 5 |
+| `hudi-v6-table/` | 0.14.0 | 6 |
+| `hudi-v8-table/` | 1.0.2 | 8 |
+| `hudi-v9-table/` | 1.1.0 | 9 |
+
+## Table Schema
+
+All fixture tables use a consistent simple schema:
+- `id` (string) - Record identifier
+- `name` (string) - Record name
+- `ts` (long) - Timestamp
+- `partition` (string) - Partition value
+
+## Table Structure
+
+Each fixture table contains:
+- 2-3 base files (parquet)
+- 2-3 log files
+- Multiple committed instants
+- 1 pending/failed write (for rollback testing)
+- Basic .hoodie metadata structure
+
+## Generating Fixtures
+
+### Prerequisites
+- Java 8+ installed
+- Internet connection (for downloading Spark binaries and Hudi bundles via
Maven)
+
+### Generation Process
+
+Use the `generate-fixtures.sh` script to create all fixture tables:
+
+```bash
+./generate-fixtures.sh
+```
+
+**Note**: The script will create fixture tables in the `mor-tables/`
directory. On first run, it downloads and caches Spark binaries in the
`spark-versions/` directory. Each fixture generation may take several minutes
as it downloads Spark binaries and Hudi bundles, then creates table data.
+
+### Script Parameters
+
+The `generate-fixtures.sh` script supports the following parameters:
+
+| Parameter | Description | Required | Example |
+|-----------|-------------|----------|---------|
+| `--version <version_list>` | Comma-separated list of table versions to
generate | No | `--version 4,5,6` |
+| `--hudi-bundle-path <path>` | Path to locally built Hudi bundle JAR
(required for version 9) | Only for version 9 | `--hudi-bundle-path
/path/to/bundle.jar` |
+
+#### Supported Versions
+- **4** - Hudi 0.11.1 (Spark 3.2.4, Scala 2.12)
+- **5** - Hudi 0.12.2 (Spark 3.3.1, Scala 2.12)
+- **6** - Hudi 0.14.0 (Spark 3.4.1, Scala 2.12)
+- **8** - Hudi 1.0.2 (Spark 3.5.4, Scala 2.12)
+- **9** - Hudi 1.1.0 (Spark 3.5.4, Scala 2.12) - **Requires local bundle**
+
+#### Usage Examples
+
+```bash
+# Generate all available versions (4,5,6,8) - version 9 excluded due to local
bundle requirement
+./generate-fixtures.sh
+
+# Generate specific versions only
+./generate-fixtures.sh --version 4,5
+
+# Generate only version 6
+./generate-fixtures.sh --version 6
+
+# Generate version 9 (requires locally built Hudi bundle)
+./generate-fixtures.sh --version 9 --hudi-bundle-path
/path/to/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
+
+# Generate multiple versions including version 9
+./generate-fixtures.sh --version 4,6,9 --hudi-bundle-path /path/to/bundle.jar
+```
+
+#### Version 9 Special Requirements
+
+Version 9 requires a locally built Hudi bundle since Hudi 1.1.0 is not yet
officially released. To build the bundle:
+
+```bash
+# In your Hudi repository
+cd <hudi-repo>
+mvn clean install -DskipTests -Dspark3.5 -Dscala-2.12 -pl
packaging/hudi-spark-bundle -am
+
+# Then use the generated bundle
+./generate-fixtures.sh --version 9 --hudi-bundle-path
<hudi-repo>/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar
+```
+
+**Note**: If you try to generate version 9 without providing
`--hudi-bundle-path`, the script will display detailed build instructions and
exit with an error.
+
+### Spark Binaries and Compatibility Matrix
+
+The script downloads and caches official Apache Spark binaries with Hudi
bundles resolved via the `--packages` flag:
+
+| Hudi Version | Table Version | Spark Version | Scala Version | Downloaded
Binary |
+|--------------|---------------|---------------|---------------|-------------------|
+| 0.11.1 | 4 | 3.2.4 | 2.12 |
spark-3.2.4-bin-hadoop3.2.tgz |
+| 0.12.2 | 5 | 3.3.1 | 2.12 |
spark-3.3.1-bin-hadoop3.tgz |
+| 0.14.0 | 6 | 3.4.1 | 2.12 |
spark-3.4.1-bin-hadoop3.tgz |
+| 1.0.2 | 8 | 3.5.4 | 2.12 |
spark-3.5.4-bin-hadoop3.tgz |
+| 1.1.0 | 9 | 3.5.4 | 2.12 |
spark-3.5.4-bin-hadoop3.tgz |
+
+### Manual Generation Example
+
+The script uses a template-based approach with separate Scala files and
variable substitution. Here's how to manually replicate the process:
+
+#### Hudi 0.11.1 (Version 4)
+```bash
+# 1. Download and extract Spark 3.2.4 binary (if not already present)
+mkdir -p spark-versions
+cd spark-versions
+wget
https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
+tar -xzf spark-3.2.4-bin-hadoop3.2.tgz
+cd ..
+
+# 2. Create custom Scala script from template
+cp scala-templates/generate-fixture.scala /tmp/generate_hudi-v4-table.scala
+
+# 3. Substitute template variables in the copied script
+sed -i.bak \
+ -e 's/${TABLE_NAME}/hudi-v4-table_table/g' \
+ -e 's|${BASE_PATH}|'$(pwd)'/mor-tables/hudi-v4-table|g' \
+ /tmp/generate_hudi-v4-table.scala
+
+# 4. Run spark-shell with the customized Scala script using -i flag
+./spark-versions/spark-3.2.4-bin-hadoop3.2/bin/spark-shell \
+ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+ --conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
+ --conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+ --conf 'spark.jars.ivy=/tmp/ivy-cache-hudi-v4-table' \
+ --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+ --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
+ -i /tmp/generate_hudi-v4-table.scala
+
+# 5. Clean up temporary files
+rm -f /tmp/generate_hudi-v4-table.scala /tmp/generate_hudi-v4-table.scala.bak
+rm -rf /tmp/ivy-cache-hudi-v4-table
+```
+
+**Note**: The Scala code itself is in `scala-templates/generate-fixture.scala`
and contains template variables like `${TABLE_NAME}` and `${BASE_PATH}` that
get replaced by the shell script.
+
+#### Other Versions
+For other versions, use the same template-based pattern but with the
appropriate Spark binary and Hudi bundle version from the compatibility matrix
above. The key differences are:
+
+- **Hudi 0.12.2 (Version 5)**:
+ - Spark binary: `./spark-versions/spark-3.3.1-bin-hadoop3/bin/spark-shell`
+ - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2`
+ - Table name: `hudi-v5-table_table`, Base path: `mor-tables/hudi-v5-table`
+
+- **Hudi 0.14.0 (Version 6)**:
+ - Spark binary: `./spark-versions/spark-3.4.1-bin-hadoop3/bin/spark-shell`
+ - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0`
+ - Table name: `hudi-v6-table_table`, Base path: `mor-tables/hudi-v6-table`
+
+- **Hudi 1.0.2 (Version 8)**:
+ - Spark binary: `./spark-versions/spark-3.5.4-bin-hadoop3/bin/spark-shell`
+ - Hudi bundle: `--packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.2`
+ - Table name: `hudi-v8-table_table`, Base path: `mor-tables/hudi-v8-table`
+
+- **Hudi 1.1.0 (Version 9)**: Requires `--jars <local-bundle-path>` instead of
`--packages` (see version 9 requirements above)
+
+
+## Notes
+
+- Fixtures are copied to temporary directories during testing to avoid
modifications
+- Each fixture should be self-contained with all necessary metadata
+- Keep fixtures minimal but realistic (small data sizes for fast tests)
+- Ensure consistent schema across all versions for compatibility testing
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
new file mode 100755
index 000000000000..526c08327932
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
@@ -0,0 +1,341 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Script to generate Hudi MOR table fixtures for different versions
+# Used by TestUpgradeDowngradeFixtures for integration testing
+
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+FIXTURES_DIR="$SCRIPT_DIR/mor-tables"
+
+echo "Generating Hudi upgrade/downgrade test fixtures..."
+echo "Fixtures directory: $FIXTURES_DIR"
+
+# Parse command line arguments
+REQUESTED_VERSIONS=""
+HUDI_BUNDLE_PATH=""
+while [[ $# -gt 0 ]]; do
+ case $1 in
+ --version)
+ REQUESTED_VERSIONS="$2"
+ shift 2
+ ;;
+ --hudi-bundle-path)
+ HUDI_BUNDLE_PATH="$2"
+ shift 2
+ ;;
+ *)
+ echo "Unknown option: $1"
+ echo "Usage: $0 [--version <version_list>] [--hudi-bundle-path
<path>]"
+ echo " --version <version_list> Comma-separated list of
table versions to generate (e.g., 4,5,6)"
+ echo " --hudi-bundle-path <path> Path to locally built
Hudi bundle JAR (required for version 9)"
+ echo ""
+ echo "Examples:"
+ echo " $0 # Generate
all versions (except 9)"
+ echo " $0 --version 4,5 # Generate
versions 4 and 5"
+ echo " $0 --version 9 --hudi-bundle-path
/path/to/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar"
+ echo ""
+ echo "Note: Version 9 requires a locally built Hudi bundle from
master branch"
+ exit 1
+ ;;
+ esac
+done
+
+# Convert comma-separated versions to array
+if [ -n "$REQUESTED_VERSIONS" ]; then
+ IFS=',' read -ra VERSION_ARRAY <<< "$REQUESTED_VERSIONS"
+ echo "INFO: Generating specific versions: ${VERSION_ARRAY[@]}"
+else
+ echo "INFO: Generating all fixture versions"
+fi
+
+# Function to check if a version should be generated
+should_generate_version() {
+ local version=$1
+ # If no specific versions requested, generate all
+ if [ -z "$REQUESTED_VERSIONS" ]; then
+ return 0
+ fi
+ # Check if version is in the requested array
+ for requested in "${VERSION_ARRAY[@]}"; do
+ if [ "$requested" = "$version" ]; then
+ return 0
+ fi
+ done
+ return 1
+}
+
+# Validate version 9 requirements
+if should_generate_version "9"; then
+ if [ -z "$HUDI_BUNDLE_PATH" ]; then
+ echo "ERROR: Version 9 requires --hudi-bundle-path argument"
+ echo ""
+ echo "To generate table version 9, you need to:"
+ echo "1. Build Hudi from master branch:"
+ echo " cd <hudi-repo>"
+ echo " mvn clean install -DskipTests -Dspark3.5 -Dscala-2.12 -pl
packaging/hudi-spark-bundle -am"
+ echo ""
+ echo "2. Provide the path to the built bundle:"
+ echo " $0 --version 9 --hudi-bundle-path
<hudi-repo>/packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.1.0-SNAPSHOT.jar"
+ exit 1
+ fi
+
+ if [ ! -f "$HUDI_BUNDLE_PATH" ]; then
+ echo "ERROR: Hudi bundle not found at: $HUDI_BUNDLE_PATH"
+ exit 1
+ fi
+
+ echo "Using local Hudi bundle: $HUDI_BUNDLE_PATH"
+fi
+
+# Function to ensure Spark binary is available locally
+ensure_spark_binary() {
+ local spark_version=$1
+
+ # Determine exact Spark version and tarball name
+ case "$spark_version" in
+ "3.2")
+ local spark_full_version="3.2.4"
+ local spark_tarball="spark-3.2.4-bin-hadoop3.2.tgz"
+ local extracted_dirname="spark-3.2.4-bin-hadoop3.2"
+ ;;
+ "3.3")
+ local spark_full_version="3.3.4"
+ local spark_tarball="spark-3.3.4-bin-hadoop3.tgz"
+ local extracted_dirname="spark-3.3.4-bin-hadoop3"
+ ;;
+ "3.4")
+ local spark_full_version="3.4.3"
+ local spark_tarball="spark-3.4.3-bin-hadoop3.tgz"
+ local extracted_dirname="spark-3.4.3-bin-hadoop3"
+ ;;
+ "3.5")
+ local spark_full_version="3.5.1"
+ local spark_tarball="spark-3.5.1-bin-hadoop3.tgz"
+ local extracted_dirname="spark-3.5.1-bin-hadoop3"
+ ;;
+ *)
+ echo "ERROR: Unsupported Spark version: $spark_version"
+ exit 1
+ ;;
+ esac
+
+ local spark_dir="$SCRIPT_DIR/spark-versions/spark-${spark_full_version}"
+ local spark_bin="$spark_dir/bin/spark-shell"
+
+ # Check if Spark binary already exists
+ if [ -f "$spark_bin" ]; then
+ echo "Spark $spark_full_version already available at $spark_dir" >&2
+ echo "$spark_dir"
+ return 0
+ fi
+
+ echo "Downloading Spark $spark_full_version..." >&2
+
+ local
download_url="https://archive.apache.org/dist/spark/spark-${spark_full_version}/${spark_tarball}"
+ local temp_tarball="/tmp/${spark_tarball}"
+
+ # Download Spark binary
+ if ! curl -L -o "$temp_tarball" "$download_url"; then
+ echo "ERROR: Failed to download Spark $spark_full_version" >&2
+ exit 1
+ fi
+
+ # Extract to spark-versions directory
+ echo "Extracting Spark $spark_full_version..." >&2
+ mkdir -p "$SCRIPT_DIR/spark-versions"
+
+ if ! tar -xzf "$temp_tarball" -C "$SCRIPT_DIR/spark-versions/"; then
+ echo "ERROR: Failed to extract Spark $spark_full_version" >&2
+ rm -f "$temp_tarball"
+ exit 1
+ fi
+
+ # Rename extracted directory to our expected name
+ local extracted_dir="$SCRIPT_DIR/spark-versions/$extracted_dirname"
+ if [ -d "$extracted_dir" ]; then
+ mv "$extracted_dir" "$spark_dir"
+ fi
+
+ # Clean up tarball
+ rm -f "$temp_tarball"
+
+ # Add a small delay to ensure filesystem sync
+ echo "Waiting for filesystem sync..." >&2
+ sleep 1
+
+ echo "Spark $spark_full_version ready at $spark_dir" >&2
+ echo "$spark_dir"
+}
+
+# Function to generate a fixture table for a specific Hudi version
+generate_fixture() {
+ local hudi_version=$1
+ local table_version=$2
+ local fixture_name=$3
+ local spark_version=$4
+ local scala_version=$5
+
+ echo "📊 Generating fixture: $fixture_name (Hudi $hudi_version, Table
Version $table_version)"
+ echo " Using Spark $spark_version with Scala $scala_version"
+
+ local fixture_path="$FIXTURES_DIR/$fixture_name"
+
+ # Clean existing fixture directory to prevent table type conflicts
+ if [ -d "$fixture_path" ]; then
+ echo "Cleaning existing fixture directory: $fixture_path"
+ rm -rf "$fixture_path"
+ fi
+
+ mkdir -p "$fixture_path"
+
+ # Ensure Spark binary is available
+ local spark_home=$(ensure_spark_binary "$spark_version")
+ echo "DEBUG: spark_home=[$spark_home]"
+
+ # Validate Spark installation
+ if [ ! -f "$spark_home/bin/spark-shell" ]; then
+ echo "ERROR: spark-shell not found at $spark_home/bin/spark-shell"
+ echo " Directory contents:"
+ ls -la "$spark_home/bin/" || echo " Directory does not exist"
+ exit 1
+ fi
+ echo "Validated spark-shell exists at: $spark_home/bin/spark-shell"
+
+ # Prepare Scala script from template
+ local table_name="${fixture_name}_table"
+ local temp_script="/tmp/generate_${fixture_name}.scala"
+
+ # Copy template and substitute variables
+ cp "$SCRIPT_DIR/scala-templates/generate-fixture.scala" "$temp_script"
+ sed -i.bak \
+ -e "s/\${FIXTURE_NAME}/$fixture_name/g" \
+ -e "s/\${TABLE_NAME}/$table_name/g" \
+ -e "s|\${BASE_PATH}|$fixture_path|g" \
+ "$temp_script"
+ rm -f "${temp_script}.bak"
+
+ # Run Spark shell directly to generate the fixture
+ echo "Running Spark shell directly from: $spark_home"
+
+ # Create temporary directory for Ivy cache to avoid permission issues
+ local ivy_cache_dir="/tmp/ivy-cache-${fixture_name}"
+ mkdir -p "$ivy_cache_dir"
+
+ # Handle version 9 specially (use local JAR instead of Maven)
+ if [ "$table_version" = "9" ]; then
+ echo "Using local Hudi bundle: $HUDI_BUNDLE_PATH"
+ "$spark_home/bin/spark-shell" \
+ --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+ --conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
+ --conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+ --conf "spark.jars.ivy=$ivy_cache_dir" \
+ --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+ --jars "$HUDI_BUNDLE_PATH" -i "$temp_script"
+ else
+ # Use Maven packages for official releases
+ local
hudi_bundle="org.apache.hudi:hudi-spark${spark_version}-bundle_${scala_version}:${hudi_version}"
+ echo "Using Hudi bundle: $hudi_bundle"
+ "$spark_home/bin/spark-shell" \
+ --conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
+ --conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
+ --conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
+ --conf "spark.jars.ivy=$ivy_cache_dir" \
+ --conf 'spark.sql.warehouse.dir=/tmp/spark-warehouse' \
+ --packages "$hudi_bundle" -i "$temp_script"
+ fi
+
+ # Clean up ivy cache directory
+ rm -rf "$ivy_cache_dir"
+
+ # Clean up temp script
+ rm "/tmp/generate_${fixture_name}.scala"
+
+ echo "Fixture $fixture_name generated successfully"
+}
+
+# Generate fixtures for each version using appropriate Spark versions
+echo "Generating fixtures for all supported versions..."
+
+# Based on Hudi-Spark compatibility matrix:
+# 1.0.x -> 3.5.x (default)
+# 0.15.x -> 3.5.x (default)
+# 0.14.x -> 3.4.x (default)
+# 0.13.x -> 3.3.x (default)
+# 0.12.x -> 3.3.x (default)
+# 0.11.x -> 3.2.x (default)
+
+echo "Hudi Version -> Spark Version -> Scala Version mapping:"
+
+# Hudi 0.11.1 (Table Version 4) -> Spark 3.2.x (default) -> Scala 2.12
+if should_generate_version "4"; then
+ echo " 0.11.1 -> Spark 3.2 -> Scala 2.12"
+ generate_fixture "0.11.1" "4" "hudi-v4-table" "3.2" "2.12"
+fi
+
+# Hudi 0.12.2 (Table Version 5) -> Spark 3.3.x (default) -> Scala 2.12
+if should_generate_version "5"; then
+ echo " 0.12.2 -> Spark 3.3 -> Scala 2.12"
+ generate_fixture "0.12.2" "5" "hudi-v5-table" "3.3" "2.12"
+fi
+
+# Hudi 0.14.0 (Table Version 6) -> Spark 3.4.x (default) -> Scala 2.12
+if should_generate_version "6"; then
+ echo " 0.14.0 -> Spark 3.4 -> Scala 2.12"
+ generate_fixture "0.14.0" "6" "hudi-v6-table" "3.4" "2.12"
+fi
+
+# Hudi 1.0.2 (Table Version 8) -> Spark 3.5.x (default) -> Scala 2.12
+if should_generate_version "8"; then
+ echo " 1.0.2 -> Spark 3.5 -> Scala 2.12"
+ generate_fixture "1.0.2" "8" "hudi-v8-table" "3.5" "2.12"
+fi
+
+# Hudi 1.1.0 (Table Version 9) -> Spark 3.5.x (default) -> Scala 2.12
(requires local bundle)
+if should_generate_version "9"; then
+ echo " 1.1.0 -> Spark 3.5 -> Scala 2.12 (using local bundle)"
+ generate_fixture "1.1.0" "9" "hudi-v9-table" "3.5" "2.12"
+fi
+
+
+echo ""
+echo "Fixture generation completed!"
+
+# Compress fixture tables to save space
+echo ""
+echo "Compressing fixture tables..."
+for fixture_dir in "$FIXTURES_DIR"/hudi-v*-table; do
+ if [ -d "$fixture_dir" ]; then
+ fixture_name=$(basename "$fixture_dir")
+ echo "Compressing $fixture_name..."
+ (cd "$FIXTURES_DIR" && zip -r -q -X "${fixture_name}.zip"
"$fixture_name")
+ if [ $? -eq 0 ]; then
+ rm -rf "$fixture_dir"
+ echo "Created ${fixture_name}.zip"
+ else
+ echo "ERROR: Failed to compress $fixture_name"
+ exit 1
+ fi
+ fi
+done
+
+echo ""
+echo "Compression completed!"
+echo "Generated compressed fixtures:"
+find "$FIXTURES_DIR" -name "hudi-v*-table.zip" | sort
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
new file mode 100644
index 000000000000..154efa99c8c1
Binary files /dev/null and
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v4-table.zip
differ
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
new file mode 100644
index 000000000000..0e6c5833b69f
Binary files /dev/null and
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v5-table.zip
differ
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
new file mode 100644
index 000000000000..b571f1efde12
Binary files /dev/null and
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v6-table.zip
differ
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
new file mode 100644
index 000000000000..d93dca2ebbe8
Binary files /dev/null and
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v8-table.zip
differ
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
new file mode 100644
index 000000000000..33dc309c3443
Binary files /dev/null and
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/mor-tables/hudi-v9-table.zip
differ
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
new file mode 100644
index 000000000000..fbefed9e05df
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.sql.SaveMode
+import org.apache.hudi.DataSourceWriteOptions._
+import spark.implicits._
+
+val tableName = "${TABLE_NAME}"
+val basePath = "${BASE_PATH}"
+
+// Create simple test data with consistent schema
+val testData = Seq(
+ ("id1", "Alice", 1000L, "2023-01-01"),
+ ("id2", "Bob", 2000L, "2023-01-01"),
+ ("id3", "Charlie", 3000L, "2023-01-02"),
+ ("id4", "David", 4000L, "2023-01-02"),
+ ("id5", "Eve", 5000L, "2023-01-03")
+)
+
+val df = testData.toDF("id", "name", "ts", "partition")
+
+// Write initial batch (creates base files)
+df.write.format("hudi").
+ option(PRECOMBINE_FIELD.key, "ts").
+ option(RECORDKEY_FIELD.key, "id").
+ option(PARTITIONPATH_FIELD.key, "partition").
+ option("hoodie.table.name", tableName).
+ option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+ option("hoodie.datasource.write.operation", "insert").
+ option("hoodie.index.bloom.num_entries", "20").
+ option("hoodie.bloom.index.false.positive.rate", "0.1").
+ option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+ option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file
threshold
+ option("hoodie.parquet.compression.codec", "snappy").
+ mode(SaveMode.Overwrite).
+ save(basePath)
+
+println("Initial batch written")
+
+// Write update batch (creates log files)
+val updateData = Seq(
+ ("id1", "Alice_Updated", 1001L, "2023-01-01"),
+ ("id2", "Bob_Updated", 2001L, "2023-01-01"),
+ ("id6", "Frank", 6000L, "2023-01-03")
+)
+
+val updateDf = updateData.toDF("id", "name", "ts", "partition")
+
+updateDf.write.format("hudi").
+ option(PRECOMBINE_FIELD.key, "ts").
+ option(RECORDKEY_FIELD.key, "id").
+ option(PARTITIONPATH_FIELD.key, "partition").
+ option("hoodie.table.name", tableName).
+ option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+ option("hoodie.datasource.write.operation", "upsert").
+ option("hoodie.index.bloom.num_entries", "20").
+ option("hoodie.bloom.index.false.positive.rate", "0.1").
+ option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+ option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file
threshold
+ option("hoodie.parquet.compression.codec", "snappy").
+ mode(SaveMode.Append).
+ save(basePath)
+
+println("Update batch written")
+
+// Create one more insert to have multiple commits
+val insertData = Seq(
+ ("id7", "Grace", 7000L, "2023-01-04"),
+ ("id8", "Henry", 8000L, "2023-01-04")
+)
+
+val insertDf = insertData.toDF("id", "name", "ts", "partition")
+
+insertDf.write.format("hudi").
+ option(PRECOMBINE_FIELD.key, "ts").
+ option(RECORDKEY_FIELD.key, "id").
+ option(PARTITIONPATH_FIELD.key, "partition").
+ option("hoodie.table.name", tableName).
+ option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
+ option("hoodie.datasource.write.operation", "insert").
+ option("hoodie.index.bloom.num_entries", "20").
+ option("hoodie.bloom.index.false.positive.rate", "0.1").
+ option("hoodie.parquet.max.file.size", "51200"). // 50KB max file size
+ option("hoodie.parquet.small.file.limit", "25600"). // 25KB small file
threshold
+ option("hoodie.parquet.compression.codec", "snappy").
+ mode(SaveMode.Append).
+ save(basePath)
+
+println("Additional insert written")
+println(s"Fixture ${FIXTURE_NAME} generated successfully!")
+
+System.exit(0)