This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f3b19cc4d71a [MINOR] TableChanges class throw schema evolution 
exception (#13581)
f3b19cc4d71a is described below

commit f3b19cc4d71af58670e9b1dbc0050b6e958ad2fc
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Sat Jul 19 18:32:28 2025 -0700

    [MINOR] TableChanges class throw schema evolution exception (#13581)
---
 .../hudi/internal/schema/action/TableChanges.java  | 47 ++++++++++++----------
 .../hudi/table/ITTestSchemaEvolutionBySQL.java     |  6 ++-
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  |  9 ++---
 3 files changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
index d039e2e2ed5a..cd1cdb289951 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.internal.schema.action;
 
+import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.InternalSchemaBuilder;
@@ -83,20 +84,21 @@ public class TableChanges {
      * @param name name of the column to update
      * @param newType new type for the column
      * @return this
-     * @throws IllegalArgumentException
+     * @throws SchemaCompatibilityException
      */
     public ColumnUpdateChange updateColumnType(String name, Type newType) {
       checkColModifyIsLegal(name);
       if (newType.isNestedType()) {
-        throw new IllegalArgumentException(String.format("only support update 
primitive type but find nest column: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot update 
column '%s' to nested type '%s'.", name, newType));
       }
       Types.Field field = internalSchema.findField(name);
       if (field == null) {
-        throw new IllegalArgumentException(String.format("cannot update a 
missing column: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot update 
type for column '%s' because it does not exist in the schema", name));
       }
 
       if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) {
-        throw new IllegalArgumentException(String.format("cannot update origin 
type: %s to a incompatibility type: %s", field.type(), newType));
+        throw new SchemaCompatibilityException(String.format(
+            "Cannot update column '%s' from type '%s' to incompatible type 
'%s'.", name, field.type(), newType));
       }
 
       if (field.type().equals(newType)) {
@@ -119,13 +121,13 @@ public class TableChanges {
      * @param name name of the column to update
      * @param newDoc new documentation for the column
      * @return this
-     * @throws IllegalArgumentException
+     * @throws SchemaCompatibilityException
      */
     public ColumnUpdateChange updateColumnComment(String name, String newDoc) {
       checkColModifyIsLegal(name);
       Types.Field field = internalSchema.findField(name);
       if (field == null) {
-        throw new IllegalArgumentException(String.format("cannot update a 
missing column: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot update 
comment for column '%s' because it does not exist in the schema", name));
       }
       // consider null
       if (Objects.equals(field.doc(), newDoc)) {
@@ -148,19 +150,19 @@ public class TableChanges {
      * @param name name of the column to rename
      * @param newName new name for the column
      * @return this
-     * @throws IllegalArgumentException
+     * @throws SchemaCompatibilityException
      */
     public ColumnUpdateChange renameColumn(String name, String newName) {
       checkColModifyIsLegal(name);
       Types.Field field = internalSchema.findField(name);
       if (field == null) {
-        throw new IllegalArgumentException(String.format("cannot update a 
missing column: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot rename 
column '%s' because it does not exist in the schema", name));
       }
       if (newName == null || newName.isEmpty()) {
-        throw new IllegalArgumentException(String.format("cannot rename 
column: %s to empty", name));
+        throw new SchemaCompatibilityException(String.format("Cannot rename 
column '%s' to empty or null name. New name must be non-empty", name));
       }
       if (internalSchema.hasColumn(newName, caseSensitive)) {
-        throw new IllegalArgumentException(String.format("cannot rename 
column: %s to a existing name", name));
+        throw new SchemaCompatibilityException(String.format("Cannot rename 
column '%s' to '%s' because a column with name '%s' already exists in the 
schema", name, newName, newName));
       }
       // save update info
       Types.Field update = updates.get(field.fieldId());
@@ -179,7 +181,7 @@ public class TableChanges {
      * @param name name of the column to update
      * @param nullable nullable for updated name
      * @return this
-     * @throws IllegalArgumentException
+     * @throws SchemaCompatibilityException
      */
     public ColumnUpdateChange updateColumnNullability(String name, boolean 
nullable) {
       return updateColumnNullability(name, nullable, false);
@@ -189,14 +191,15 @@ public class TableChanges {
       checkColModifyIsLegal(name);
       Types.Field field = internalSchema.findField(name);
       if (field == null) {
-        throw new IllegalArgumentException(String.format("cannot update a 
missing column: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot update 
nullability for column '%s' because it does not exist in the schema", name));
       }
       if (field.isOptional() == nullable) {
         // do nothings
         return this;
       }
       if (field.isOptional() && !nullable && !force) {
-        throw new IllegalArgumentException("cannot update column Nullability: 
optional to required");
+        throw new SchemaCompatibilityException(String.format(
+            "Cannot change column '%s' from optional to required. This would 
break compatibility with existing data that may contain null values", name));
       }
       // save update info
       Types.Field update = updates.get(field.fieldId());
@@ -224,7 +227,7 @@ public class TableChanges {
       if (field != null) {
         return field.fieldId();
       } else {
-        throw new IllegalArgumentException(String.format("cannot find col id 
for given column fullName: %s", fullName));
+        throw new SchemaCompatibilityException(String.format("Cannot find 
column ID for column path '%s'. The column may not exist in the schema", 
fullName));
       }
     }
 
@@ -239,7 +242,7 @@ public class TableChanges {
 
   /** Deal with delete columns changes for table. */
   public static class ColumnDeleteChange extends TableChange.BaseColumnChange {
-    private final Set deletes = new HashSet<>();
+    private final Set<Integer> deletes = new HashSet<>();
 
     @Override
     public ColumnChangeID columnChangeId() {
@@ -268,7 +271,7 @@ public class TableChanges {
       checkColModifyIsLegal(name);
       Types.Field field = internalSchema.findField(name);
       if (field == null) {
-        throw new IllegalArgumentException(String.format("cannot delete 
missing columns: %s", name));
+        throw new SchemaCompatibilityException(String.format("Cannot delete 
column '%s' because it does not exist in the schema", name));
       }
       deletes.add(field.fieldId());
       return this;
@@ -335,25 +338,25 @@ public class TableChanges {
       if (!parent.isEmpty()) {
         Types.Field parentField = internalSchema.findField(parent);
         if (parentField == null) {
-          throw new HoodieSchemaException(String.format("cannot add column: %s 
which parent: %s is not exist", name, parent));
+          throw new HoodieSchemaException(String.format("Cannot add column 
'%s' because its parent column '%s' does not exist in the schema", name, 
parent));
         }
-        Type parentType = parentField.type();
         if (!(parentField.type() instanceof Types.RecordType)) {
-          throw new HoodieSchemaException("only support add nested columns to 
struct column");
+          throw new HoodieSchemaException(String.format(
+              "Cannot add nested column '%s' to parent '%s' of type '%s'. 
Nested columns can only be added to struct/record types", name, parent, 
parentField.type()));
         }
         parentId = parentField.fieldId();
         Types.Field newParentField = internalSchema.findField(parent + "."  + 
name);
         if (newParentField != null) {
-          throw new HoodieSchemaException(String.format("cannot add column: %s 
which already exist", name));
+          throw new HoodieSchemaException(String.format("Cannot add column 
'%s' to parent '%s' because the column already exists at path '%s'", name, 
parent, parent + "." + name));
         }
         fullName = parent + "." + name;
       } else {
         if (internalSchema.hasColumn(name, caseSensitive)) {
-          throw new HoodieSchemaException(String.format("cannot add column: %s 
which already exist", name));
+          throw new HoodieSchemaException(String.format("Cannot add column 
'%s' because it already exists in the schema", name));
         }
       }
       if (fullColName2Id.containsKey(fullName)) {
-        throw new HoodieSchemaException(String.format("cannot repeat add 
column: %s", name));
+        throw new HoodieSchemaException(String.format("Cannot add column '%s' 
multiple times. Column at path '%s' has already been added in this change set", 
name, fullName));
       }
       fullColName2Id.put(fullName, nextId);
       if (parentId != -1) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
index a98e11727983..518fd920589b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.adapter.TestHoodieCatalogs;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestTableEnvs;
 
@@ -191,8 +192,9 @@ public abstract class ITTestSchemaEvolutionBySQL {
         TableException.class,
         () -> tableEnv.executeSql(alterSql),
         "Should throw exception when the type update is not allowed ");
-    assertTrue(e.getCause() instanceof IllegalArgumentException);
-    assertTrue(e.getCause().getMessage().contains("cannot update origin type: 
string to a incompatibility type: int"));
+    assertTrue(e.getCause() instanceof SchemaCompatibilityException);
+    assertTrue(e.getCause().getMessage().contains("Cannot update column 
'f_str' from type 'string' to incompatible type 'int'."),
+        e.getCause().getMessage());
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index ee13b61dd71c..1a5e17893fa1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -363,11 +363,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         }
         // check duplicate add or rename
         // keep consistent with hive, column names insensitive
-        checkExceptions(s"alter table $tableName rename column col0 to 
col9")(Seq("cannot rename column: col0 to a existing name",
-          "Cannot rename column, because col9 already exists in root"))
-        checkExceptions(s"alter table $tableName rename column col0 to 
COL9")(Seq("cannot rename column: col0 to a existing name", "Cannot rename 
column, because COL9 already exists in root"))
-        checkExceptions(s"alter table $tableName add columns(col9 int 
first)")(Seq("cannot add column: col9 which already exist", "Cannot add column, 
because col9 already exists in root"))
-        checkExceptions(s"alter table $tableName add columns(COL9 int 
first)")(Seq("cannot add column: COL9 which already exist", "Cannot add column, 
because COL9 already exists in root"))
+        checkExceptions(s"alter table $tableName rename column col0 to 
col9")(Seq("Cannot rename column 'col0' to 'col9' because a column with name 
'col9' already exists in the schema"))
+        checkExceptions(s"alter table $tableName rename column col0 to 
COL9")(Seq("Cannot rename column 'col0' to 'COL9' because a column with name 
'COL9' already exists in the schema"))
+        checkExceptions(s"alter table $tableName add columns(col9 int 
first)")(Seq("Cannot add column 'col9' because it already exists in the 
schema"))
+        checkExceptions(s"alter table $tableName add columns(COL9 int 
first)")(Seq("Cannot add column 'COL9' because it already exists in the 
schema"))
         // test add comment for columns / alter columns comment
         spark.sql(s"alter table $tableName add columns(col1_new int comment 
'add new columns col1_new after id' after id)")
         spark.sql(s"alter table $tableName alter column col9 comment 'col9 
desc'")

Reply via email to