JingsongLi commented on code in PR #370:
URL: https://github.com/apache/paimon-rust/pull/370#discussion_r3418347742
##########
crates/paimon/src/spec/schema.rs:
##########
@@ -140,32 +146,268 @@ impl TableSchema {
}
/// Apply a list of schema changes and return a new schema with
incremented ID.
+ ///
+ /// Column-level changes operate on **top-level** columns only: a
+ /// `field_names` path with more than one element (a nested struct field)
is
+ /// rejected with [`crate::Error::Unsupported`].
+ ///
+ /// Column errors ([`crate::Error::ColumnNotExist`] /
+ /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table
+ /// name; the calling catalog fills in the table's full name.
pub fn apply_changes(&self, changes: Vec<crate::spec::SchemaChange>) ->
crate::Result<Self> {
+ use crate::spec::SchemaChange;
+
+ // Column errors carry no table name here; the catalog layer fills it
in.
+ let full_name = "";
+
+ // Both flags are read from the pre-alter options, mirroring Java
+ // `SchemaManager.applySchemaChanges`.
+ let disable_null_to_not_null = self
+ .options
+ .get(crate::spec::DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL_OPTION)
+ .map(|v| v == "true")
+ .unwrap_or(true);
+ let allow_explicit_cast = self
+ .options
+ .get(crate::spec::DISABLE_EXPLICIT_TYPE_CASTING_OPTION)
+ .map(|v| v != "true")
+ .unwrap_or(true);
+
let mut new_schema = self.clone();
new_schema.id += 1;
new_schema.time_millis = chrono::Utc::now().timestamp_millis();
+ // Operate on an owned field list, then write it back.
+ let mut fields = std::mem::take(&mut new_schema.fields);
+ let mut highest_field_id = new_schema.highest_field_id;
+
for change in changes {
match change {
- crate::spec::SchemaChange::SetOption { key, value } => {
+ SchemaChange::SetOption { key, value } => {
new_schema.options.insert(key, value);
}
- crate::spec::SchemaChange::RemoveOption { key } => {
+ SchemaChange::RemoveOption { key } => {
new_schema.options.remove(&key);
}
- other => {
- return Err(crate::Error::Unsupported {
- message: format!("Schema change not yet supported:
{other:?}"),
- });
+ SchemaChange::UpdateComment { comment } => {
+ new_schema.comment = comment;
+ }
+ SchemaChange::AddColumn {
+ field_names,
+ data_type,
+ comment,
+ column_move,
+ } => {
+ let name = top_level_field(&field_names)?;
+ if field_index(&fields, name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ });
+ }
+ // Mirrors Java: an added column has no value for existing
+ // rows, so it must be nullable.
+ if !data_type.is_nullable() {
+ return Err(crate::Error::ConfigInvalid {
+ message: format!("Column {name} cannot specify NOT
NULL."),
+ });
+ }
+ highest_field_id += 1;
+ let id = highest_field_id;
+ let data_type = reassign_field_ids(data_type, &mut
highest_field_id);
+ let field =
+ DataField::new(id, name.to_string(),
data_type).with_description(comment);
+ insert_field_with_move(&mut fields, field,
column_move.as_ref(), full_name)?;
+ }
+ SchemaChange::RenameColumn {
+ field_names,
+ new_name,
+ } => {
+ let name = top_level_field(&field_names)?;
+ // Existing partition data is laid out with the old key
name
+ // in paths and metadata; renaming would break resolution.
+ if new_schema.partition_keys.iter().any(|k| k == name) {
+ return Err(crate::Error::Unsupported {
+ message: format!("Cannot rename partition column:
[{name}]"),
+ });
+ }
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_name != name && field_index(&fields,
&new_name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: new_name,
+ });
+ }
+ fields[idx] =
fields[idx].clone().with_name(new_name.clone());
+ rename_in_keys(&mut new_schema.primary_keys, name,
&new_name);
+ rename_in_option_list(
+ &mut new_schema.options,
+ BUCKET_KEY_OPTION,
+ name,
+ &new_name,
+ );
+ rename_in_option_list(
Review Comment:
This only rewrites `bucket-key` and `sequence.field`, but aggregation tables
also have field-scoped options such as `fields.<col>.aggregate-function` and
`fields.<col>.list-agg-delimiter`. After renaming `tag` to `label`,
`fields.tag.list-agg-delimiter` remains under the old key;
`ListaggAgg::new("label", ...)` will not find it and will silently switch back
to the default delimiter. Java rewrites these field-scoped option keys in
`applyRenameColumnsToOptions`, so we should update them here as well to
preserve aggregation semantics across renames.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]