JingsongLi commented on code in PR #370:
URL: https://github.com/apache/paimon-rust/pull/370#discussion_r3394215830


##########
crates/paimon/src/spec/schema.rs:
##########
@@ -127,27 +134,204 @@ impl TableSchema {
     }
 
     /// Apply a list of schema changes and return a new schema with 
incremented ID.
+    ///
+    /// Column-level changes operate on **top-level** columns only: a
+    /// `field_names` path with more than one element (a nested struct field) 
is
+    /// rejected with [`crate::Error::Unsupported`].
+    ///
+    /// Column errors ([`crate::Error::ColumnNotExist`] /
+    /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table
+    /// name; the calling catalog fills in the table's full name.
     pub fn apply_changes(&self, changes: Vec<crate::spec::SchemaChange>) -> 
crate::Result<Self> {
+        use crate::spec::SchemaChange;
+
+        // Column errors carry no table name here; the catalog layer fills it 
in.
+        let full_name = "";
+
         let mut new_schema = self.clone();
         new_schema.id += 1;
         new_schema.time_millis = chrono::Utc::now().timestamp_millis();
 
+        // Operate on an owned field list, then write it back.
+        let mut fields = std::mem::take(&mut new_schema.fields);
+        let mut highest_field_id = new_schema.highest_field_id;
+
         for change in changes {
             match change {
-                crate::spec::SchemaChange::SetOption { key, value } => {
+                SchemaChange::SetOption { key, value } => {
                     new_schema.options.insert(key, value);
                 }
-                crate::spec::SchemaChange::RemoveOption { key } => {
+                SchemaChange::RemoveOption { key } => {
                     new_schema.options.remove(&key);
                 }
-                other => {
-                    return Err(crate::Error::Unsupported {
-                        message: format!("Schema change not yet supported: 
{other:?}"),
-                    });
+                SchemaChange::UpdateComment { comment } => {
+                    new_schema.comment = comment;
+                }
+                SchemaChange::AddColumn {
+                    field_names,
+                    data_type,
+                    comment,
+                    column_move,
+                } => {
+                    let name = top_level_field(&field_names)?;
+                    if field_index(&fields, name).is_some() {
+                        return Err(crate::Error::ColumnAlreadyExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        });
+                    }
+                    // Mirrors Java: an added column has no value for existing
+                    // rows, so it must be nullable.
+                    if !data_type.is_nullable() {
+                        return Err(crate::Error::ConfigInvalid {
+                            message: format!("Column {name} cannot specify NOT 
NULL."),
+                        });
+                    }
+                    highest_field_id += 1;
+                    let id = highest_field_id;
+                    let data_type = reassign_field_ids(data_type, &mut 
highest_field_id);
+                    let field =
+                        DataField::new(id, name.to_string(), 
data_type).with_description(comment);
+                    insert_field_with_move(&mut fields, field, 
column_move.as_ref(), full_name)?;
+                }
+                SchemaChange::RenameColumn {
+                    field_names,
+                    new_name,
+                } => {
+                    let name = top_level_field(&field_names)?;
+                    // Existing partition data is laid out with the old key 
name
+                    // in paths and metadata; renaming would break resolution.
+                    if new_schema.partition_keys.iter().any(|k| k == name) {
+                        return Err(crate::Error::Unsupported {
+                            message: format!("Cannot rename partition column: 
[{name}]"),
+                        });
+                    }
+                    let idx =
+                        field_index(&fields, name).ok_or_else(|| 
crate::Error::ColumnNotExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        })?;
+                    if new_name != name && field_index(&fields, 
&new_name).is_some() {
+                        return Err(crate::Error::ColumnAlreadyExist {
+                            full_name: full_name.to_string(),
+                            column: new_name,
+                        });
+                    }
+                    fields[idx] = 
fields[idx].clone().with_name(new_name.clone());
+                    rename_in_keys(&mut new_schema.primary_keys, name, 
&new_name);
+                    rename_in_option_list(
+                        &mut new_schema.options,
+                        BUCKET_KEY_OPTION,
+                        name,
+                        &new_name,
+                    );
+                    rename_in_option_list(
+                        &mut new_schema.options,
+                        SEQUENCE_FIELD_OPTION,
+                        name,
+                        &new_name,
+                    );
+                }
+                SchemaChange::DropColumn { field_names } => {
+                    let name = top_level_field(&field_names)?;
+                    let idx =
+                        field_index(&fields, name).ok_or_else(|| 
crate::Error::ColumnNotExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        })?;
+                    if new_schema.partition_keys.iter().any(|k| k == name)
+                        || new_schema.primary_keys.iter().any(|k| k == name)
+                    {
+                        return Err(crate::Error::Unsupported {
+                            message: format!(
+                                "Cannot drop partition or primary key column 
'{name}' of table {full_name}"
+                            ),
+                        });
+                    }
+                    if fields.len() == 1 {
+                        return Err(crate::Error::Unsupported {
+                            message: "Cannot drop all fields in 
table".to_string(),
+                        });
+                    }
+                    fields.remove(idx);
+                }
+                SchemaChange::UpdateColumnType {
+                    field_names,
+                    new_data_type,
+                    keep_nullability,
+                } => {
+                    let name = top_level_field(&field_names)?;
+                    // Existing partitions, bucket assignment, and key encoding
+                    // were all written with the old key type.
+                    if new_schema.partition_keys.iter().any(|k| k == name) {
+                        return Err(crate::Error::Unsupported {
+                            message: format!("Cannot update partition column: 
[{name}]"),
+                        });
+                    }
+                    if new_schema.primary_keys.iter().any(|k| k == name) {
+                        return Err(crate::Error::Unsupported {
+                            message: "Cannot update primary key".to_string(),
+                        });
+                    }
+                    let idx =
+                        field_index(&fields, name).ok_or_else(|| 
crate::Error::ColumnNotExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        })?;
+                    let old = &fields[idx];
+                    // Lenient: replace the type without cast-compatibility 
checks.
+                    let target = if keep_nullability {
+                        
new_data_type.copy_with_nullable(old.data_type().is_nullable())?
+                    } else {
+                        new_data_type
+                    };
+                    fields[idx] = DataField::new(old.id(), 
old.name().to_string(), target)
+                        .with_description(old.description().map(|s| 
s.to_string()));
+                }
+                SchemaChange::UpdateColumnNullability {
+                    field_names,
+                    new_nullability,
+                } => {
+                    let name = top_level_field(&field_names)?;
+                    // Primary keys are normalized to NOT NULL at create time;
+                    // a nullable key column would break key/bucket semantics.
+                    if new_nullability && 
new_schema.primary_keys.iter().any(|k| k == name) {
+                        return Err(crate::Error::Unsupported {
+                            message: "Cannot change nullability of primary 
key".to_string(),
+                        });
+                    }
+                    let idx =
+                        field_index(&fields, name).ok_or_else(|| 
crate::Error::ColumnNotExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        })?;
+                    let old = &fields[idx];
+                    let nt = 
old.data_type().copy_with_nullable(new_nullability)?;
+                    fields[idx] = DataField::new(old.id(), 
old.name().to_string(), nt)
+                        .with_description(old.description().map(|s| 
s.to_string()));
+                }
+                SchemaChange::UpdateColumnComment {
+                    field_names,
+                    new_comment,
+                } => {
+                    let name = top_level_field(&field_names)?;
+                    let idx =
+                        field_index(&fields, name).ok_or_else(|| 
crate::Error::ColumnNotExist {
+                            full_name: full_name.to_string(),
+                            column: name.to_string(),
+                        })?;
+                    fields[idx] = 
fields[idx].clone().with_description(Some(new_comment));
+                }
+                SchemaChange::UpdateColumnPosition { column_move } => {
+                    apply_move(&mut fields, &column_move, full_name)?;
                 }
             }
         }
 
+        new_schema.fields = fields;
+        new_schema.highest_field_id =
+            
highest_field_id.max(Self::current_highest_field_id(&new_schema.fields));
+
         Schema::validate_first_row_changelog_producer(&new_schema.options)?;

Review Comment:
   After applying column and option changes, this only re-runs the first-row 
changelog validation. Create-time schema validation also enforces invariants 
such as BLOB columns requiring `data-evolution.enabled` and partial-update 
options being limited to the supported basic mode, but alter can now add/update 
a column to BLOB or set unsupported partial-update options and still persist 
the schema. Please re-run the relevant final-schema validations before saving 
the new schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to