JingsongLi commented on code in PR #370:
URL: https://github.com/apache/paimon-rust/pull/370#discussion_r3389053238
##########
crates/paimon/src/spec/schema.rs:
##########
@@ -127,27 +127,153 @@ impl TableSchema {
}
/// Apply a list of schema changes and return a new schema with
incremented ID.
+ ///
+ /// Column-level changes operate on **top-level** columns only: a
+ /// `field_names` path with more than one element (a nested struct field)
is
+ /// rejected with [`crate::Error::Unsupported`].
+ ///
+ /// Column errors ([`crate::Error::ColumnNotExist`] /
+ /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table
+ /// name; the calling catalog fills in the table's full name.
pub fn apply_changes(&self, changes: Vec<crate::spec::SchemaChange>) ->
crate::Result<Self> {
+ use crate::spec::SchemaChange;
+
+ // Column errors carry no table name here; the catalog layer fills it
in.
+ let full_name = "";
+
let mut new_schema = self.clone();
new_schema.id += 1;
new_schema.time_millis = chrono::Utc::now().timestamp_millis();
+ // Operate on an owned field list, then write it back.
+ let mut fields = std::mem::take(&mut new_schema.fields);
+ let mut highest_field_id = new_schema.highest_field_id;
+
for change in changes {
match change {
- crate::spec::SchemaChange::SetOption { key, value } => {
+ SchemaChange::SetOption { key, value } => {
new_schema.options.insert(key, value);
}
- crate::spec::SchemaChange::RemoveOption { key } => {
+ SchemaChange::RemoveOption { key } => {
new_schema.options.remove(&key);
}
- other => {
- return Err(crate::Error::Unsupported {
- message: format!("Schema change not yet supported:
{other:?}"),
- });
+ SchemaChange::UpdateComment { comment } => {
+ new_schema.comment = comment;
+ }
+ SchemaChange::AddColumn {
+ field_names,
+ data_type,
+ comment,
+ column_move,
+ } => {
+ let name = top_level_field(&field_names)?;
+ if field_index(&fields, name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ });
+ }
+ highest_field_id += 1;
+ let field = DataField::new(highest_field_id,
name.to_string(), data_type)
+ .with_description(comment);
+ insert_field_with_move(&mut fields, field,
column_move.as_ref(), full_name)?;
+ }
+ SchemaChange::RenameColumn {
+ field_names,
+ new_name,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_name != name && field_index(&fields,
&new_name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: new_name,
+ });
+ }
+ fields[idx] =
fields[idx].clone().with_name(new_name.clone());
+ rename_in_keys(&mut new_schema.partition_keys, name,
&new_name);
+ rename_in_keys(&mut new_schema.primary_keys, name,
&new_name);
+ }
+ SchemaChange::DropColumn { field_names } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_schema.partition_keys.iter().any(|k| k == name)
+ || new_schema.primary_keys.iter().any(|k| k == name)
+ {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Cannot drop partition or primary key column
'{name}' of table {full_name}"
+ ),
+ });
+ }
+ fields.remove(idx);
Review Comment:
This allows dropping the last non-key column and persisting a schema with
`fields: []`. Java rejects this (`Cannot drop all fields in table`), and
several Rust paths assume a table schema still has data fields for bucket keys,
scans, writes, and partition/key resolution. Please reject the drop when it
would leave the table with no fields.
##########
crates/paimon/src/spec/schema.rs:
##########
@@ -127,27 +127,153 @@ impl TableSchema {
}
/// Apply a list of schema changes and return a new schema with
incremented ID.
+ ///
+ /// Column-level changes operate on **top-level** columns only: a
+ /// `field_names` path with more than one element (a nested struct field)
is
+ /// rejected with [`crate::Error::Unsupported`].
+ ///
+ /// Column errors ([`crate::Error::ColumnNotExist`] /
+ /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table
+ /// name; the calling catalog fills in the table's full name.
pub fn apply_changes(&self, changes: Vec<crate::spec::SchemaChange>) ->
crate::Result<Self> {
+ use crate::spec::SchemaChange;
+
+ // Column errors carry no table name here; the catalog layer fills it
in.
+ let full_name = "";
+
let mut new_schema = self.clone();
new_schema.id += 1;
new_schema.time_millis = chrono::Utc::now().timestamp_millis();
+ // Operate on an owned field list, then write it back.
+ let mut fields = std::mem::take(&mut new_schema.fields);
+ let mut highest_field_id = new_schema.highest_field_id;
+
for change in changes {
match change {
- crate::spec::SchemaChange::SetOption { key, value } => {
+ SchemaChange::SetOption { key, value } => {
new_schema.options.insert(key, value);
}
- crate::spec::SchemaChange::RemoveOption { key } => {
+ SchemaChange::RemoveOption { key } => {
new_schema.options.remove(&key);
}
- other => {
- return Err(crate::Error::Unsupported {
- message: format!("Schema change not yet supported:
{other:?}"),
- });
+ SchemaChange::UpdateComment { comment } => {
+ new_schema.comment = comment;
+ }
+ SchemaChange::AddColumn {
+ field_names,
+ data_type,
+ comment,
+ column_move,
+ } => {
+ let name = top_level_field(&field_names)?;
+ if field_index(&fields, name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ });
+ }
+ highest_field_id += 1;
+ let field = DataField::new(highest_field_id,
name.to_string(), data_type)
+ .with_description(comment);
+ insert_field_with_move(&mut fields, field,
column_move.as_ref(), full_name)?;
+ }
+ SchemaChange::RenameColumn {
+ field_names,
+ new_name,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_name != name && field_index(&fields,
&new_name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: new_name,
+ });
+ }
+ fields[idx] =
fields[idx].clone().with_name(new_name.clone());
+ rename_in_keys(&mut new_schema.partition_keys, name,
&new_name);
+ rename_in_keys(&mut new_schema.primary_keys, name,
&new_name);
+ }
+ SchemaChange::DropColumn { field_names } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_schema.partition_keys.iter().any(|k| k == name)
+ || new_schema.primary_keys.iter().any(|k| k == name)
+ {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Cannot drop partition or primary key column
'{name}' of table {full_name}"
+ ),
+ });
+ }
+ fields.remove(idx);
+ }
+ SchemaChange::UpdateColumnType {
+ field_names,
+ new_data_type,
+ keep_nullability,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ let old = &fields[idx];
+ // Lenient: replace the type without cast-compatibility
checks.
+ let target = if keep_nullability {
+
new_data_type.copy_with_nullable(old.data_type().is_nullable())?
+ } else {
+ new_data_type
+ };
+ fields[idx] = DataField::new(old.id(),
old.name().to_string(), target)
Review Comment:
Type changes on key columns should be rejected before persisting the new
schema. Java rejects updating partition keys and primary keys here because
existing partitions, bucket assignment, key encoding, and manifests were
written with the old type. This branch currently permits `updateColumnType` on
those columns, so a filesystem catalog can write an incompatible schema version
that later readers/writers interpret with the wrong key type. Please add the
same partition-key/primary-key guard for type updates.
##########
crates/paimon/src/spec/schema.rs:
##########
@@ -127,27 +127,153 @@ impl TableSchema {
}
/// Apply a list of schema changes and return a new schema with
incremented ID.
+ ///
+ /// Column-level changes operate on **top-level** columns only: a
+ /// `field_names` path with more than one element (a nested struct field)
is
+ /// rejected with [`crate::Error::Unsupported`].
+ ///
+ /// Column errors ([`crate::Error::ColumnNotExist`] /
+ /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table
+ /// name; the calling catalog fills in the table's full name.
pub fn apply_changes(&self, changes: Vec<crate::spec::SchemaChange>) ->
crate::Result<Self> {
+ use crate::spec::SchemaChange;
+
+ // Column errors carry no table name here; the catalog layer fills it
in.
+ let full_name = "";
+
let mut new_schema = self.clone();
new_schema.id += 1;
new_schema.time_millis = chrono::Utc::now().timestamp_millis();
+ // Operate on an owned field list, then write it back.
+ let mut fields = std::mem::take(&mut new_schema.fields);
+ let mut highest_field_id = new_schema.highest_field_id;
+
for change in changes {
match change {
- crate::spec::SchemaChange::SetOption { key, value } => {
+ SchemaChange::SetOption { key, value } => {
new_schema.options.insert(key, value);
}
- crate::spec::SchemaChange::RemoveOption { key } => {
+ SchemaChange::RemoveOption { key } => {
new_schema.options.remove(&key);
}
- other => {
- return Err(crate::Error::Unsupported {
- message: format!("Schema change not yet supported:
{other:?}"),
- });
+ SchemaChange::UpdateComment { comment } => {
+ new_schema.comment = comment;
+ }
+ SchemaChange::AddColumn {
+ field_names,
+ data_type,
+ comment,
+ column_move,
+ } => {
+ let name = top_level_field(&field_names)?;
+ if field_index(&fields, name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ });
+ }
+ highest_field_id += 1;
+ let field = DataField::new(highest_field_id,
name.to_string(), data_type)
+ .with_description(comment);
+ insert_field_with_move(&mut fields, field,
column_move.as_ref(), full_name)?;
+ }
+ SchemaChange::RenameColumn {
+ field_names,
+ new_name,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_name != name && field_index(&fields,
&new_name).is_some() {
+ return Err(crate::Error::ColumnAlreadyExist {
+ full_name: full_name.to_string(),
+ column: new_name,
+ });
+ }
+ fields[idx] =
fields[idx].clone().with_name(new_name.clone());
+ rename_in_keys(&mut new_schema.partition_keys, name,
&new_name);
+ rename_in_keys(&mut new_schema.primary_keys, name,
&new_name);
+ }
+ SchemaChange::DropColumn { field_names } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ if new_schema.partition_keys.iter().any(|k| k == name)
+ || new_schema.primary_keys.iter().any(|k| k == name)
+ {
+ return Err(crate::Error::Unsupported {
+ message: format!(
+ "Cannot drop partition or primary key column
'{name}' of table {full_name}"
+ ),
+ });
+ }
+ fields.remove(idx);
+ }
+ SchemaChange::UpdateColumnType {
+ field_names,
+ new_data_type,
+ keep_nullability,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ let old = &fields[idx];
+ // Lenient: replace the type without cast-compatibility
checks.
+ let target = if keep_nullability {
+
new_data_type.copy_with_nullable(old.data_type().is_nullable())?
+ } else {
+ new_data_type
+ };
+ fields[idx] = DataField::new(old.id(),
old.name().to_string(), target)
+ .with_description(old.description().map(|s|
s.to_string()));
+ }
+ SchemaChange::UpdateColumnNullability {
+ field_names,
+ new_nullability,
+ } => {
+ let name = top_level_field(&field_names)?;
+ let idx =
+ field_index(&fields, name).ok_or_else(||
crate::Error::ColumnNotExist {
+ full_name: full_name.to_string(),
+ column: name.to_string(),
+ })?;
+ let old = &fields[idx];
+ let nt =
old.data_type().copy_with_nullable(new_nullability)?;
Review Comment:
This can make a primary-key column nullable (`newNullability = true`).
Primary keys are normalized to NOT NULL at create time, and Java rejects
changing primary-key nullability to nullable; otherwise subsequent writes can
accept NULLs in key columns and break key/bucket semantics. Please reject
`UpdateColumnNullability` when the target column is a primary key and
`new_nullability` is true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]