This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ec38d14 feat: add schema change (#27)
ec38d14 is described below
commit ec38d14464526af5b31a820fc75a7b2ab3c02b13
Author: ErXi <[email protected]>
AuthorDate: Mon Aug 12 10:21:53 2024 +0800
feat: add schema change (#27)
---
crates/paimon/Cargo.toml | 2 +-
crates/paimon/src/spec/mod.rs | 3 +
crates/paimon/src/spec/schema_change.rs | 431 ++++++++++++++++++++++++++++++++
3 files changed, 435 insertions(+), 1 deletion(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 99089f0..0d2b9dd 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -32,7 +32,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11.15"
serde_json = "1.0.120"
-serde_with = "3.8.3"
+serde_with = "3.9.0"
snafu = "0.8.3"
typed-builder = "^0.19"
opendal = "0.48"
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index a2d1fa3..80fb47d 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -25,6 +25,9 @@ pub use data_file::*;
mod schema;
pub use schema::*;
+mod schema_change;
+pub use schema_change::*;
+
mod snapshot;
pub use snapshot::*;
diff --git a/crates/paimon/src/spec/schema_change.rs
b/crates/paimon/src/spec/schema_change.rs
new file mode 100644
index 0000000..9597407
--- /dev/null
+++ b/crates/paimon/src/spec/schema_change.rs
@@ -0,0 +1,431 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::spec::DataType;
+use serde::{Deserialize, Serialize};
+
+/// Schema change to table.
+///
+/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L36>
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum SchemaChange {
+ /// A SchemaChange to set a table option.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L95>
+ SetOption { key: String, value: String },
+ /// A SchemaChange to remove a table option.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L134>
+ RemoveOption { key: String },
+ /// A SchemaChange to update a table comment.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L167>
+ UpdateComment { comment: Option<String> },
+ /// A SchemaChange to add a new field.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L201>
+ #[serde(rename_all = "camelCase")]
+ AddColumn {
+ field_name: String,
+ data_type: DataType,
+ description: Option<String>,
+ #[serde(rename = "move")]
+ column_move: Option<ColumnMove>,
+ },
+ /// A SchemaChange to rename a field.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L260>
+ #[serde(rename_all = "camelCase")]
+ RenameColumn {
+ field_name: String,
+ new_name: String,
+ },
+ /// A SchemaChange to drop a field.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L302>
+ #[serde(rename_all = "camelCase")]
+ DropColumn { field_name: String },
+ /// A SchemaChange to update the field's type.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L335>
+ #[serde(rename_all = "camelCase")]
+ UpdateColumnType {
+ field_name: String,
+ data_type: DataType,
+ },
+ /// A SchemaChange to update the field's position.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L377>
+ #[serde(rename_all = "camelCase")]
+ UpdateColumnPosition {
+ #[serde(rename = "move")]
+ column_move: ColumnMove,
+ },
+ /// A SchemaChange to update the field's nullability.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L470>
+ #[serde(rename_all = "camelCase")]
+ UpdateColumnNullability {
+ field_name: Vec<String>,
+ nullable: bool,
+ },
+ /// A SchemaChange to update the (nested) field's comment.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L512>
+ #[serde(rename_all = "camelCase")]
+ UpdateColumnComment {
+ field_names: Vec<String>,
+ new_description: String,
+ },
+}
+
+impl SchemaChange {
+ /// impl the `set_option`.
+ pub fn set_option(key: String, value: String) -> Self {
+ SchemaChange::SetOption { key, value }
+ }
+
+ /// impl the `remove_option`.
+ pub fn remove_option(key: String) -> Self {
+ SchemaChange::RemoveOption { key }
+ }
+
+ /// impl the `update_comment`.
+ pub fn update_comment(comment: Option<String>) -> Self {
+ SchemaChange::UpdateComment { comment }
+ }
+
+ /// impl the `add_column`.
+ pub fn add_column(field_name: String, data_type: DataType) -> Self {
+ SchemaChange::AddColumn {
+ field_name,
+ data_type,
+ description: None,
+ column_move: None,
+ }
+ }
+
+ /// impl the `add_column_with_description`.
+ pub fn add_column_with_description(
+ field_name: String,
+ data_type: DataType,
+ description: String,
+ ) -> Self {
+ SchemaChange::AddColumn {
+ field_name,
+ data_type,
+ description: Some(description),
+ column_move: None,
+ }
+ }
+
+ /// impl the `add_column_with_description_and_column_move`.
+ pub fn add_column_with_description_and_column_move(
+ field_name: String,
+ data_type: DataType,
+ description: String,
+ column_move: ColumnMove,
+ ) -> Self {
+ SchemaChange::AddColumn {
+ field_name,
+ data_type,
+ description: Some(description),
+ column_move: Some(column_move),
+ }
+ }
+
+ /// impl the `rename_column`.
+ pub fn rename_column(field_name: String, new_name: String) -> Self {
+ SchemaChange::RenameColumn {
+ field_name,
+ new_name,
+ }
+ }
+
+ /// impl the `drop_column`.
+ pub fn drop_column(field_name: String) -> Self {
+ SchemaChange::DropColumn { field_name }
+ }
+
+ /// impl the `update_column_type`.
+ pub fn update_column_type(field_name: String, new_data_type: DataType) ->
Self {
+ SchemaChange::UpdateColumnType {
+ field_name,
+ data_type: new_data_type,
+ }
+ }
+
+ /// impl the `update_column_position`.
+ pub fn update_column_position(column_move: ColumnMove) -> Self {
+ SchemaChange::UpdateColumnPosition { column_move }
+ }
+
+ /// impl the `update_column_position`.
+ pub fn update_column_nullability(field_name: String, new_nullability:
bool) -> Self {
+ SchemaChange::UpdateColumnNullability {
+ field_name: vec![field_name],
+ nullable: new_nullability,
+ }
+ }
+
+ /// impl the `update_columns_nullability`.
+ pub fn update_columns_nullability(field_names: Vec<String>,
new_nullability: bool) -> Self {
+ SchemaChange::UpdateColumnNullability {
+ field_name: field_names,
+ nullable: new_nullability,
+ }
+ }
+
+ /// impl the `update_column_comment`.
+ pub fn update_column_comment(field_name: String, comment: String) -> Self {
+ SchemaChange::UpdateColumnComment {
+ field_names: vec![field_name],
+ new_description: comment,
+ }
+ }
+
+ /// impl the `update_columns_comment`.
+ pub fn update_columns_comment(field_names: Vec<String>, comment: String)
-> Self {
+ SchemaChange::UpdateColumnComment {
+ field_names,
+ new_description: comment,
+ }
+ }
+}
+
+/// The type of move.
+///
+/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L412>
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+pub enum ColumnMoveType {
+ FIRST,
+ AFTER,
+}
+
+/// Represents a requested column move in a struct.
+///
+/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java#L410>
+#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ColumnMove {
+ pub field_name: String,
+ pub referenced_field_name: Option<String>,
+ #[serde(rename = "type")]
+ pub move_type: ColumnMoveType,
+}
+
+impl ColumnMove {
+ /// Get the field name.
+ pub fn field_name(&self) -> &str {
+ &self.field_name
+ }
+
+ /// Get the referenced field name.
+ pub fn referenced_field_name(&self) -> Option<&str> {
+ self.referenced_field_name.as_deref()
+ }
+
+ /// Get the move type.
+ pub fn move_type(&self) -> &ColumnMoveType {
+ &self.move_type
+ }
+
+ /// Create a new `Move` with `FIRST` move type.
+ pub fn move_first(field_name: String) -> Self {
+ ColumnMove {
+ field_name,
+ referenced_field_name: None,
+ move_type: ColumnMoveType::FIRST,
+ }
+ }
+
+ /// Create a new `Move` with `AFTER` move type.
+ pub fn move_after(field_name: String, referenced_field_name: String) ->
Self {
+ ColumnMove {
+ field_name,
+ referenced_field_name: Some(referenced_field_name),
+ move_type: ColumnMoveType::AFTER,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::spec::{DoubleType, IntType};
+
+ #[test]
+ fn test_schema_change_serialize_deserialize() {
+ let json_data = r#"
+ [
+ {
+ "setOption": {
+ "key": "snapshot.time-retained",
+ "value": "2h"
+ }
+ },
+ {
+ "removeOption": {
+ "key": "compaction.max.file-num"
+ }
+ },
+ {
+ "updateComment": {
+ "comment": "table.comment"
+ }
+ },
+ {
+ "addColumn": {
+ "fieldName": "col1",
+ "dataType": "INTEGER",
+ "description": "col1_description",
+ "move": {
+ "fieldName": "col1_first",
+ "referencedFieldName": null,
+ "type": "FIRST"
+ }
+ }
+ },
+ {
+ "renameColumn": {
+ "fieldName": "col3",
+ "newName": "col3_new_name"
+ }
+ },
+ {
+ "dropColumn": {
+ "fieldName": "col1"
+ }
+ },
+ {
+ "updateColumnType": {
+ "fieldName": "col14",
+ "dataType": "DOUBLE"
+ }
+ },
+ {
+ "updateColumnPosition": {
+ "move": {
+ "fieldName": "col4_first",
+ "referencedFieldName": null,
+ "type": "FIRST"
+ }
+ }
+ },
+ {
+ "updateColumnNullability": {
+ "fieldName": [
+ "col5",
+ "f2"
+ ],
+ "nullable": false
+ }
+ },
+ {
+ "updateColumnComment": {
+ "fieldNames": [
+ "col5",
+ "f1"
+ ],
+ "newDescription": "col5 f1 field"
+ }
+ }
+ ]"#;
+
+ let schema_changes: Vec<SchemaChange> =
+ serde_json::from_str(json_data).expect("Failed to deserialize
SchemaChange.");
+
+ assert_eq!(
+ schema_changes,
+ vec![
+ SchemaChange::SetOption {
+ key: "snapshot.time-retained".to_string(),
+ value: "2h".to_string(),
+ },
+ SchemaChange::RemoveOption {
+ key: "compaction.max.file-num".to_string(),
+ },
+ SchemaChange::UpdateComment {
+ comment: Some("table.comment".to_string()),
+ },
+ SchemaChange::AddColumn {
+ field_name: "col1".to_string(),
+ data_type: DataType::Int(IntType::new()),
+ description: Some("col1_description".to_string()),
+ column_move: Some(ColumnMove {
+ field_name: "col1_first".to_string(),
+ referenced_field_name: None,
+ move_type: ColumnMoveType::FIRST,
+ }),
+ },
+ SchemaChange::RenameColumn {
+ field_name: "col3".to_string(),
+ new_name: "col3_new_name".to_string(),
+ },
+ SchemaChange::DropColumn {
+ field_name: "col1".to_string(),
+ },
+ SchemaChange::UpdateColumnType {
+ field_name: "col14".to_string(),
+ data_type: DataType::Double(DoubleType::new()),
+ },
+ SchemaChange::UpdateColumnPosition {
+ column_move: ColumnMove {
+ field_name: "col4_first".to_string(),
+ referenced_field_name: None,
+ move_type: ColumnMoveType::FIRST,
+ },
+ },
+ SchemaChange::UpdateColumnNullability {
+ field_name: vec!["col5".to_string(), "f2".to_string()],
+ nullable: false,
+ },
+ SchemaChange::UpdateColumnComment {
+ field_names: vec!["col5".to_string(), "f1".to_string()],
+ new_description: "col5 f1 field".to_string(),
+ },
+ ]
+ );
+ }
+
+ #[test]
+ fn test_column_move_serialize_deserialize() {
+ let json_data = r#"
+ [
+ {
+ "fieldName": "col1",
+ "referencedFieldName": null,
+ "type": "FIRST"
+ },
+ {
+ "fieldName": "col2_after",
+ "referencedFieldName": "col2",
+ "type": "AFTER"
+ }
+ ]"#;
+
+ let column_moves: Vec<ColumnMove> =
serde_json::from_str(json_data).unwrap();
+ assert_eq!(
+ column_moves,
+ vec![
+ ColumnMove::move_first("col1".to_string()),
+ ColumnMove::move_after("col2_after".to_string(),
"col2".to_string()),
+ ]
+ );
+ }
+}