This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d424808c6802dc405f2789a1d0da82e10b2bf6ca
Author: Lin Liu <[email protected]>
AuthorDate: Thu Oct 23 09:56:44 2025 -0700

    fix: Handle missing valueType column after upgrade   (#14105)
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
---
 .../java/org/apache/hudi/stats/ValueMetadata.java  |   5 +
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  17 ++-
 .../cow-tables/hudi-v8-table-cow.zip               | Bin 0 -> 380008 bytes
 .../generate-fixtures.sh                           |   3 +
 .../scala-templates/generate-fixture-cow.scala     | 153 +++++++++++++++++++++
 5 files changed, 171 insertions(+), 7 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java
index 5c1e087b326f..cebcb2aac3ba 100644
--- a/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/stats/ValueMetadata.java
@@ -214,6 +214,11 @@ public class ValueMetadata implements Serializable {
       throw new IllegalStateException("ColumnStatsMetadata is null. Handling 
should happen in the caller.");
     }
 
+    // This may happen when the record is from old table versions.
+    if (!columnStatsRecord.hasField(COLUMN_STATS_FIELD_VALUE_TYPE)) {
+      return V1EmptyMetadata.get();
+    }
+
     GenericRecord valueTypeInfo = (GenericRecord) 
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_TYPE);
     if (valueTypeInfo == null) {
       return V1EmptyMetadata.get();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 5407cd8fda87..5ae4fc222a1c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -82,6 +82,8 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
       return "/upgrade-downgrade-fixtures/complex-keygen-tables/";
     } else if (suffix.contains("mor")) {
       return "/upgrade-downgrade-fixtures/mor-tables/";
+    } else if (suffix.contains("cow")) {
+      return "/upgrade-downgrade-fixtures/cow-tables/";
     } else if (suffix.contains("payload")) {
       return "/upgrade-downgrade-fixtures/payload-tables/";
     } else {
@@ -92,12 +94,12 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
   @Disabled
   @ParameterizedTest
   @MethodSource("upgradeDowngradeVersionPairs")
-  public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, 
HoodieTableVersion toVersion) throws Exception {
+  public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, 
HoodieTableVersion toVersion, String suffix) throws Exception {
     boolean isUpgrade = fromVersion.lesserThan(toVersion);
     String operation = isUpgrade ? "upgrade" : "downgrade";
     LOG.info("Testing {} from version {} to {}", operation, fromVersion, 
toVersion);
     
-    HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, 
"-mor");
+    HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, 
suffix);
     assertEquals(fromVersion, 
originalMetaClient.getTableConfig().getTableVersion(),
         "Fixture table should be at expected version");
     
@@ -110,7 +112,7 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
     
     // Confirm that there are log files before rollback and compaction 
operations
     if (isRollbackAndCompactTransition(fromVersion, toVersion)) {
-      validateLogFilesCount(originalMetaClient, operation, true);
+      validateLogFilesCount(originalMetaClient, operation, 
suffix.equals("-mor"));
     }
     
     new UpgradeDowngrade(originalMetaClient, config, context(), 
SparkUpgradeDowngradeHelper.getInstance())
@@ -517,12 +519,13 @@ public class TestUpgradeDowngrade extends 
SparkClientFunctionalTestHarness {
   private static Stream<Arguments> upgradeDowngradeVersionPairs() {
     return Stream.of(
         // Upgrade test cases for six and greater
-        Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT),   // 
V6 -> V8
-        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE),  // 
V8 -> V9
+        Arguments.of(HoodieTableVersion.SIX, HoodieTableVersion.EIGHT, 
"-mor"),   // V6 -> V8
+        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE, 
"-mor"),  // V8 -> V9
+        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.NINE, 
"-cow"),  // V8 -> V9
 
         // Downgrade test cases til six
-        Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT),  // 
V9 -> V8
-        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX)   // V8 
-> V6
+        Arguments.of(HoodieTableVersion.NINE, HoodieTableVersion.EIGHT, 
"-mor"),  // V9 -> V8
+        Arguments.of(HoodieTableVersion.EIGHT, HoodieTableVersion.SIX, "-mor") 
  // V8 -> V6
     );
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip
new file mode 100644
index 000000000000..54e4ff3240e1
Binary files /dev/null and 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/cow-tables/hudi-v8-table-cow.zip
 differ
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
index 9b08d00881d0..ac5eff4e558d 100755
--- 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/generate-fixtures.sh
@@ -71,6 +71,9 @@ SCRIPT_SUFFIX="${SCRIPT_BASE#generate-fixture}"  # Remove 
generate-fixture prefi
 if [[ "$SCALA_SCRIPT_NAME" == *"mor"* ]]; then
     FIXTURES_DIR="$SCRIPT_DIR/mor-tables"
     echo "Using mor tables directory: $FIXTURES_DIR"
+elif [[ "$SCALA_SCRIPT_NAME" == *"cow"* ]]; then
+    FIXTURES_DIR="$SCRIPT_DIR/cow-tables"
+    echo "Using cow tables directory: $FIXTURES_DIR"
 elif [[ "$SCALA_SCRIPT_NAME" == *"complex-keygen"* ]]; then
     FIXTURES_DIR="$SCRIPT_DIR/complex-keygen-tables"
     echo "Using complex-keygen tables directory: $FIXTURES_DIR"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala
new file mode 100644
index 000000000000..2c8e445f8a1f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/upgrade-downgrade-fixtures/scala-templates/generate-fixture-cow.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.sql.SaveMode
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.DataSourceWriteOptions._
+import spark.implicits._
+
+val tableName = "${TABLE_NAME}"  // Backticks for SQL identifiers with hyphens
+val basePath = "${BASE_PATH}"
+
+println("Generating cow table with archival and clustering...")
+
+// Base cow settings used by all steps
+val cowConfig = Map(
+  "hoodie.compact.inline" -> "false",
+  "hoodie.clustering.inline" -> "true",
+  "hoodie.clustering.inline.max.commits" -> "4",
+  "hoodie.metadata.compact.max.delta.commits" -> "3",
+  "hoodie.keep.min.commits" -> "5",
+  "hoodie.keep.max.commits" -> "6",
+  "hoodie.cleaner.commits.retained" -> "5"
+)
+
+// Initial setup (table creation + file size settings for steps 1-2)
+val initialSetupConfig = Map(
+  HoodieTableConfig.PRECOMBINE_FIELD.key -> "ts",
+  RECORDKEY_FIELD.key -> "id",
+  PARTITIONPATH_FIELD.key -> "partition",
+  "hoodie.table.name" -> tableName,
+  "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
+  "hoodie.parquet.max.file.size" -> "2048", // 2KB - very small files
+  "hoodie.parquet.small.file.limit" -> "1024", // 1KB threshold
+  "hoodie.clustering.plan.strategy.small.file.limit" -> "10240", // 10KB
+  "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "10240" // 10KB 
target
+)
+
+// Archival settings
+val archivalConfig = Map(
+  "hoodie.archive.automatic" -> "true",
+  "hoodie.commits.archival.batch" -> "1",
+  "hoodie.archive.merge.files.batch.size" -> "1"
+)
+
+val initialData = Seq(
+  ("id1", "Alice", 1000L, "2023-01-01"),
+  ("id2", "Bob", 1001L, "2023-01-01"),
+  ("id3", "Charlie", 1002L, "2023-01-01"),
+  ("id4", "David", 1003L, "2023-01-02"),
+  ("id5", "Eve", 1004L, "2023-01-02")
+)
+
+val initialDf = initialData.toDF("id", "name", "ts", "partition")
+
+(initialDf.write.format("hudi").
+  options(initialSetupConfig ++ cowConfig).
+  option("hoodie.datasource.write.operation", "insert").
+  mode(SaveMode.Overwrite)).
+  save(basePath)
+
+
+println("Step 1: Initial data written as table")
+
+val moreData1 = Seq(
+  ("id6", "Frank", 2000L, "2023-01-01"),
+  ("id7", "Grace", 2001L, "2023-01-01")
+)
+
+((moreData1.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(initialSetupConfig ++ cowConfig).
+  option("hoodie.datasource.write.operation", "insert").
+  mode(SaveMode.Append))).
+  save(basePath)
+
+
+println("Step 2: Added more small files")
+
+val update1 = Seq(("id1", "Alice_v2", 3000L, "2023-01-01"))
+update1.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(cowConfig).
+  option("hoodie.datasource.write.operation", "upsert").
+  mode(SaveMode.Append).
+  save(basePath)
+
+
+println("Step 3: First update - will trigger COMPACTION after 3 delta commits 
(max.delta.commits=3)")
+
+val update2 = Seq(("id2", "Bob_v2", 4000L, "2023-01-01"))
+update2.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(cowConfig).
+  option("hoodie.datasource.write.operation", "upsert").
+  mode(SaveMode.Append).
+  save(basePath)
+
+
+println("Step 4: Second update - will trigger CLUSTERING after 4 commits 
(max.commits=4)")
+
+val finalData = Seq(("id8", "Final", 5000L, "2023-01-01"))
+finalData.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(cowConfig ++ archivalConfig).
+  option("hoodie.datasource.write.operation", "insert").
+  mode(SaveMode.Append).
+  save(basePath)
+
+
+println("Step 5: Insert - will trigger CLEANING (retained=5) and begin 
ARCHIVAL setup")
+
+val extraData = Seq(("id9", "Extra", 6000L, "2023-01-01"))
+extraData.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(cowConfig ++ archivalConfig).
+  option("hoodie.datasource.write.operation", "insert").
+  mode(SaveMode.Append).
+  save(basePath)
+
+println("Step 6: Extra insert - will trigger ARCHIVAL (keep.max.commits=6 
exceeded)")
+
+val moreExtraData = Seq(("id10", "MoreExtra", 7000L, "2023-01-01"))
+moreExtraData.toDF("id", "name", "ts", "partition").write.format("hudi").
+  options(cowConfig ++ archivalConfig).
+  option("hoodie.datasource.write.operation", "insert").
+  mode(SaveMode.Append).
+  save(basePath)
+
+println("Step 7: More extra insert - ensures ARCHIVAL is completed")
+
+val deleteData = Seq(("id1", "Alice_v2", 9000L, "2023-01-01"))
+deleteData.toDF("id", "name", "ts", "partition").write.format("hudi").
+  option("hoodie.datasource.write.operation", "delete").
+  option("hoodie.compact.inline", "false"). // Disable compaction to keep log 
files uncompacted
+  option("hoodie.clustering.inline", "false").
+  option("hoodie.clean.automatic", "false").
+  option("hoodie.archive.automatic", "false").
+  mode(SaveMode.Append).
+  save(basePath)
+
+println("Step 8: Delete operation (creates uncompacted log files)")
+
+println(s"cow table fixture ${FIXTURE_NAME} generated!")
+System.exit(0)
\ No newline at end of file

Reply via email to