This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f3b19cc4d71a [MINOR] TableChanges class throw schema evolution
exception (#13581)
f3b19cc4d71a is described below
commit f3b19cc4d71af58670e9b1dbc0050b6e958ad2fc
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Sat Jul 19 18:32:28 2025 -0700
[MINOR] TableChanges class throw schema evolution exception (#13581)
---
.../hudi/internal/schema/action/TableChanges.java | 47 ++++++++++++----------
.../hudi/table/ITTestSchemaEvolutionBySQL.java | 6 ++-
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 9 ++---
3 files changed, 33 insertions(+), 29 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
index d039e2e2ed5a..cd1cdb289951 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
@@ -18,6 +18,7 @@
package org.apache.hudi.internal.schema.action;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.InternalSchemaBuilder;
@@ -83,20 +84,21 @@ public class TableChanges {
* @param name name of the column to update
* @param newType new type for the column
* @return this
- * @throws IllegalArgumentException
+ * @throws SchemaCompatibilityException
*/
public ColumnUpdateChange updateColumnType(String name, Type newType) {
checkColModifyIsLegal(name);
if (newType.isNestedType()) {
- throw new IllegalArgumentException(String.format("only support update
primitive type but find nest column: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot update
column '%s' to nested type '%s'.", name, newType));
}
Types.Field field = internalSchema.findField(name);
if (field == null) {
- throw new IllegalArgumentException(String.format("cannot update a
missing column: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot update
type for column '%s' because it does not exist in the schema", name));
}
if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) {
- throw new IllegalArgumentException(String.format("cannot update origin
type: %s to a incompatibility type: %s", field.type(), newType));
+ throw new SchemaCompatibilityException(String.format(
+ "Cannot update column '%s' from type '%s' to incompatible type
'%s'.", name, field.type(), newType));
}
if (field.type().equals(newType)) {
@@ -119,13 +121,13 @@ public class TableChanges {
* @param name name of the column to update
* @param newDoc new documentation for the column
* @return this
- * @throws IllegalArgumentException
+ * @throws SchemaCompatibilityException
*/
public ColumnUpdateChange updateColumnComment(String name, String newDoc) {
checkColModifyIsLegal(name);
Types.Field field = internalSchema.findField(name);
if (field == null) {
- throw new IllegalArgumentException(String.format("cannot update a
missing column: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot update
comment for column '%s' because it does not exist in the schema", name));
}
// consider null
if (Objects.equals(field.doc(), newDoc)) {
@@ -148,19 +150,19 @@ public class TableChanges {
* @param name name of the column to rename
* @param newName new name for the column
* @return this
- * @throws IllegalArgumentException
+ * @throws SchemaCompatibilityException
*/
public ColumnUpdateChange renameColumn(String name, String newName) {
checkColModifyIsLegal(name);
Types.Field field = internalSchema.findField(name);
if (field == null) {
- throw new IllegalArgumentException(String.format("cannot update a
missing column: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot rename
column '%s' because it does not exist in the schema", name));
}
if (newName == null || newName.isEmpty()) {
- throw new IllegalArgumentException(String.format("cannot rename
column: %s to empty", name));
+ throw new SchemaCompatibilityException(String.format("Cannot rename
column '%s' to empty or null name. New name must be non-empty", name));
}
if (internalSchema.hasColumn(newName, caseSensitive)) {
- throw new IllegalArgumentException(String.format("cannot rename
column: %s to a existing name", name));
+ throw new SchemaCompatibilityException(String.format("Cannot rename
column '%s' to '%s' because a column with name '%s' already exists in the
schema", name, newName, newName));
}
// save update info
Types.Field update = updates.get(field.fieldId());
@@ -179,7 +181,7 @@ public class TableChanges {
* @param name name of the column to update
* @param nullable nullable for updated name
* @return this
- * @throws IllegalArgumentException
+ * @throws SchemaCompatibilityException
*/
public ColumnUpdateChange updateColumnNullability(String name, boolean
nullable) {
return updateColumnNullability(name, nullable, false);
@@ -189,14 +191,15 @@ public class TableChanges {
checkColModifyIsLegal(name);
Types.Field field = internalSchema.findField(name);
if (field == null) {
- throw new IllegalArgumentException(String.format("cannot update a
missing column: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot update
nullability for column '%s' because it does not exist in the schema", name));
}
if (field.isOptional() == nullable) {
// do nothings
return this;
}
if (field.isOptional() && !nullable && !force) {
- throw new IllegalArgumentException("cannot update column Nullability:
optional to required");
+ throw new SchemaCompatibilityException(String.format(
+ "Cannot change column '%s' from optional to required. This would
break compatibility with existing data that may contain null values", name));
}
// save update info
Types.Field update = updates.get(field.fieldId());
@@ -224,7 +227,7 @@ public class TableChanges {
if (field != null) {
return field.fieldId();
} else {
- throw new IllegalArgumentException(String.format("cannot find col id
for given column fullName: %s", fullName));
+ throw new SchemaCompatibilityException(String.format("Cannot find
column ID for column path '%s'. The column may not exist in the schema",
fullName));
}
}
@@ -239,7 +242,7 @@ public class TableChanges {
/** Deal with delete columns changes for table. */
public static class ColumnDeleteChange extends TableChange.BaseColumnChange {
- private final Set deletes = new HashSet<>();
+ private final Set<Integer> deletes = new HashSet<>();
@Override
public ColumnChangeID columnChangeId() {
@@ -268,7 +271,7 @@ public class TableChanges {
checkColModifyIsLegal(name);
Types.Field field = internalSchema.findField(name);
if (field == null) {
- throw new IllegalArgumentException(String.format("cannot delete
missing columns: %s", name));
+ throw new SchemaCompatibilityException(String.format("Cannot delete
column '%s' because it does not exist in the schema", name));
}
deletes.add(field.fieldId());
return this;
@@ -335,25 +338,25 @@ public class TableChanges {
if (!parent.isEmpty()) {
Types.Field parentField = internalSchema.findField(parent);
if (parentField == null) {
- throw new HoodieSchemaException(String.format("cannot add column: %s
which parent: %s is not exist", name, parent));
+ throw new HoodieSchemaException(String.format("Cannot add column
'%s' because its parent column '%s' does not exist in the schema", name,
parent));
}
- Type parentType = parentField.type();
if (!(parentField.type() instanceof Types.RecordType)) {
- throw new HoodieSchemaException("only support add nested columns to
struct column");
+ throw new HoodieSchemaException(String.format(
+ "Cannot add nested column '%s' to parent '%s' of type '%s'.
Nested columns can only be added to struct/record types", name, parent,
parentField.type()));
}
parentId = parentField.fieldId();
Types.Field newParentField = internalSchema.findField(parent + "." +
name);
if (newParentField != null) {
- throw new HoodieSchemaException(String.format("cannot add column: %s
which already exist", name));
+ throw new HoodieSchemaException(String.format("Cannot add column
'%s' to parent '%s' because the column already exists at path '%s'", name,
parent, parent + "." + name));
}
fullName = parent + "." + name;
} else {
if (internalSchema.hasColumn(name, caseSensitive)) {
- throw new HoodieSchemaException(String.format("cannot add column: %s
which already exist", name));
+ throw new HoodieSchemaException(String.format("Cannot add column
'%s' because it already exists in the schema", name));
}
}
if (fullColName2Id.containsKey(fullName)) {
- throw new HoodieSchemaException(String.format("cannot repeat add
column: %s", name));
+ throw new HoodieSchemaException(String.format("Cannot add column '%s'
multiple times. Column at path '%s' has already been added in this change set",
name, fullName));
}
fullColName2Id.put(fullName, nextId);
if (parentId != -1) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
index a98e11727983..518fd920589b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.adapter.TestHoodieCatalogs;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestTableEnvs;
@@ -191,8 +192,9 @@ public abstract class ITTestSchemaEvolutionBySQL {
TableException.class,
() -> tableEnv.executeSql(alterSql),
"Should throw exception when the type update is not allowed ");
- assertTrue(e.getCause() instanceof IllegalArgumentException);
- assertTrue(e.getCause().getMessage().contains("cannot update origin type:
string to a incompatibility type: int"));
+ assertTrue(e.getCause() instanceof SchemaCompatibilityException);
+ assertTrue(e.getCause().getMessage().contains("Cannot update column
'f_str' from type 'string' to incompatible type 'int'."),
+ e.getCause().getMessage());
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index ee13b61dd71c..1a5e17893fa1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -363,11 +363,10 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
// check duplicate add or rename
// keep consistent with hive, column names insensitive
- checkExceptions(s"alter table $tableName rename column col0 to
col9")(Seq("cannot rename column: col0 to a existing name",
- "Cannot rename column, because col9 already exists in root"))
- checkExceptions(s"alter table $tableName rename column col0 to
COL9")(Seq("cannot rename column: col0 to a existing name", "Cannot rename
column, because COL9 already exists in root"))
- checkExceptions(s"alter table $tableName add columns(col9 int
first)")(Seq("cannot add column: col9 which already exist", "Cannot add column,
because col9 already exists in root"))
- checkExceptions(s"alter table $tableName add columns(COL9 int
first)")(Seq("cannot add column: COL9 which already exist", "Cannot add column,
because COL9 already exists in root"))
+ checkExceptions(s"alter table $tableName rename column col0 to
col9")(Seq("Cannot rename column 'col0' to 'col9' because a column with name
'col9' already exists in the schema"))
+ checkExceptions(s"alter table $tableName rename column col0 to
COL9")(Seq("Cannot rename column 'col0' to 'COL9' because a column with name
'COL9' already exists in the schema"))
+ checkExceptions(s"alter table $tableName add columns(col9 int
first)")(Seq("Cannot add column 'col9' because it already exists in the
schema"))
+ checkExceptions(s"alter table $tableName add columns(COL9 int
first)")(Seq("Cannot add column 'COL9' because it already exists in the
schema"))
// test add comment for columns / alter columns comment
spark.sql(s"alter table $tableName add columns(col1_new int comment
'add new columns col1_new after id' after id)")
spark.sql(s"alter table $tableName alter column col9 comment 'col9
desc'")