This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d424808c6802dc405f2789a1d0da82e10b2bf6ca Author: Lin Liu <[email protected]> AuthorDate: Thu Oct 23 09:56:44 2025 -0700 fix: Handle missing valueType column after upgrade (#14105) --------- Co-authored-by: Lokesh Jain <[email protected]> --- .../java/org/apache/hudi/stats/ValueMetadata.java | 5 + .../hudi/table/upgrade/TestUpgradeDowngrade.java | 17 ++- .../cow-tables/hudi-v8-table-cow.zip | Bin 0 -> 380008 bytes .../generate-fixtures.sh | 3 + .../scala-templates/generate-fixture-cow.scala | 153 +++++++++++++++++++++ 5 files changed, 171 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java b/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java index 5c1e087b326f..cebcb2aac3ba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java @@ -214,6 +214,11 @@ public class ValueMetadata implements Serializable { throw new IllegalStateException("ColumnStatsMetadata is null. Handling should happen in the caller."); } + // This may happen when the record is from old table versions. + if (!columnStatsRecord.hasField(COLUMN_STATS_FIELD_VALUE_TYPE)) { + return V1EmptyMetadata.get(); + } + GenericRecord valueTypeInfo = (GenericRecord) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_TYPE); if (valueTypeInfo == null) { return V1EmptyMetadata.get(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 5407cd8fda87..5ae4fc222a1c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -82,6 +82,8 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { return "/upgrade-downgrade-fixtures/complex-keygen-tables/"; } else if (suffix.contains("mor")) { return "/upgrade-downgrade-fixtures/mor-tables/"; + } else if (suffix.contains("cow")) { + return "/upgrade-downgrade-fixtures/cow-tables/"; } else if (suffix.contains("payload")) { return "/upgrade-downgrade-fixtures/payload-tables/"; } else { @@ -92,12 +94,12 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { @Disabled @ParameterizedTest @MethodSource("upgradeDowngradeVersionPairs") - public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion) throws Exception { + public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String suffix) throws Exception { boolean isUpgrade = fromVersion.lesserThan(toVersion); String operation = isUpgrade ? "upgrade" : "downgrade"; LOG.info("Testing {} from version {} to {}", operation, fromVersion, toVersion); - HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, "-mor"); + HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, suffix); assertEquals(fromVersion, originalMetaClient.getTableConfig().getTableVersion(), "Fixture table should be at expected version"); @@ -110,7 +112,7 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { // Confirm that there are log files before rollback and compaction operations if (isRollbackAndCompactTransition(fromVersion, toVersion)) { - validateLogFilesCount(originalMetaClient, operation, true); + validateLogFilesCount(originalMetaClient, operation, suffix.equals("-mor")); } new UpgradeDowngrade(originalMetaClient, config, context(), SparkUpgradeDowngradeHelper.getInstance()) @@ -517,12 +519,13 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { private static Stream<Arguments> upgradeDowngradeVersionPairs() { return Stream.of( // Upgrade test cases for six and greater - Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT), // V6 -> V8 - Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE), // V8 -> V9 + Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT, "-mor"), // V6 -> V8 + Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE, "-mor"), // V8 -> V9 + Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE, "-cow"), // V8 -> V9 // Downgrade test cases til six - Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT), // V9 -> V8 - Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX) // V8 -> V6 + Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT, "-mor"), // V9 -> V8 + Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX, "-mor") // V8 -> V6 ); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip new file mode 100644 index 000000000000..54e4ff3240e1 Binary files /dev/null and b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip differ diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh index 9b08d00881d0..ac5eff4e558d 100755 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh @@ -71,6 +71,9 @@ SCRIPT_SUFFIX="${SCRIPT_BASE#generate-fixture}" # Remove generate-fixture prefi if [[ "$SCALA_SCRIPT_NAME" == *"mor"* ]]; then FIXTURES_DIR="$SCRIPT_DIR/mor-tables" echo "Using mor tables directory: $FIXTURES_DIR" +elif [[ "$SCALA_SCRIPT_NAME" == *"cow"* ]]; then + FIXTURES_DIR="$SCRIPT_DIR/cow-tables" + echo "Using cow tables directory: $FIXTURES_DIR" elif [[ "$SCALA_SCRIPT_NAME" == *"complex-keygen"* ]]; then FIXTURES_DIR="$SCRIPT_DIR/complex-keygen-tables" echo "Using complex-keygen tables directory: $FIXTURES_DIR" diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala new file mode 100644 index 000000000000..2c8e445f8a1f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.spark.sql.SaveMode +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.DataSourceWriteOptions._ +import spark.implicits._ + +val tableName = "${TABLE_NAME}" // Backticks for SQL identifiers with hyphens +val basePath = "${BASE_PATH}" + +println("Generating cow table with archival and clustering...") + +// Base cow settings used by all steps +val cowConfig = Map( + "hoodie.compact.inline" -> "false", + "hoodie.clustering.inline" -> "true", + "hoodie.clustering.inline.max.commits" -> "4", + "hoodie.metadata.compact.max.delta.commits" -> "3", + "hoodie.keep.min.commits" -> "5", + "hoodie.keep.max.commits" -> "6", + "hoodie.cleaner.commits.retained" -> "5" +) + +// Initial setup (table creation + file size settings for steps 1-2) +val initialSetupConfig = Map( + HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts", + RECORDKEY_FIELD.key -> "id", + PARTITIONPATH_FIELD.key -> "partition", + "hoodie.table.name" -> tableName, + "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", + "hoodie.parquet.max.file.size" -> "2048", // 2KB - very small files + "hoodie.parquet.small.file.limit" -> "1024", // 1KB threshold + "hoodie.clustering.plan.strategy.small.file.limit" -> "10240", // 10KB + "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "10240" // 10KB target +) + +// Archival settings +val archivalConfig = Map( + "hoodie.archive.automatic" -> "true", + "hoodie.commits.archival.batch" -> "1", + "hoodie.archive.merge.files.batch.size" -> "1" +) + +val initialData = Seq( + ("id1", "Alice", 1000L, "2023-01-01"), + ("id2", "Bob", 1001L, "2023-01-01"), + ("id3", "Charlie", 1002L, "2023-01-01"), + ("id4", "David", 1003L, "2023-01-02"), + ("id5", "Eve", 1004L, "2023-01-02") +) + +val initialDf = initialData.toDF("id", "name", "ts", "partition") + +(initialDf.write.format("hudi"). + options(initialSetupConfig ++ cowConfig). + option("hoodie.datasource.write.operation", "insert"). + mode(SaveMode.Overwrite)). + save(basePath) + + +println("Step 1: Initial data written as table") + +val moreData1 = Seq( + ("id6", "Frank", 2000L, "2023-01-01"), + ("id7", "Grace", 2001L, "2023-01-01") +) + +((moreData1.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(initialSetupConfig ++ cowConfig). + option("hoodie.datasource.write.operation", "insert"). + mode(SaveMode.Append))). + save(basePath) + + +println("Step 2: Added more small files") + +val update1 = Seq(("id1", "Alice_v2", 3000L, "2023-01-01")) +update1.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(cowConfig). + option("hoodie.datasource.write.operation", "upsert"). + mode(SaveMode.Append). + save(basePath) + + +println("Step 3: First update - will trigger COMPACTION after 3 delta commits (max.delta.commits=3)") + +val update2 = Seq(("id2", "Bob_v2", 4000L, "2023-01-01")) +update2.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(cowConfig). + option("hoodie.datasource.write.operation", "upsert"). + mode(SaveMode.Append). + save(basePath) + + +println("Step 4: Second update - will trigger CLUSTERING after 4 commits (max.commits=4)") + +val finalData = Seq(("id8", "Final", 5000L, "2023-01-01")) +finalData.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(cowConfig ++ archivalConfig). + option("hoodie.datasource.write.operation", "insert"). + mode(SaveMode.Append). + save(basePath) + + +println("Step 5: Insert - will trigger CLEANING (retained=5) and begin ARCHIVAL setup") + +val extraData = Seq(("id9", "Extra", 6000L, "2023-01-01")) +extraData.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(cowConfig ++ archivalConfig). + option("hoodie.datasource.write.operation", "insert"). + mode(SaveMode.Append). + save(basePath) + +println("Step 6: Extra insert - will trigger ARCHIVAL (keep.max.commits=6 exceeded)") + +val moreExtraData = Seq(("id10", "MoreExtra", 7000L, "2023-01-01")) +moreExtraData.toDF("id", "name", "ts", "partition").write.format("hudi"). + options(cowConfig ++ archivalConfig). + option("hoodie.datasource.write.operation", "insert"). + mode(SaveMode.Append). + save(basePath) + +println("Step 7: More extra insert - ensures ARCHIVAL is completed") + +val deleteData = Seq(("id1", "Alice_v2", 9000L, "2023-01-01")) +deleteData.toDF("id", "name", "ts", "partition").write.format("hudi"). + option("hoodie.datasource.write.operation", "delete"). + option("hoodie.compact.inline", "false"). // Disable compaction to keep log files uncompacted + option("hoodie.clustering.inline", "false"). + option("hoodie.clean.automatic", "false"). + option("hoodie.archive.automatic", "false"). + mode(SaveMode.Append). + save(basePath) + +println("Step 8: Delete operation (creates uncompacted log files)") + +println(s"cow table fixture ${FIXTURE_NAME} generated!") +System.exit(0) \ No newline at end of file
