This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3296926  feat: introduce schema (#90)
3296926 is described below

commit 32969261ec4ef117f40ebe03ba3b24aa8feb3888
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Feb 26 12:26:16 2026 +0800

    feat: introduce schema (#90)
---
 crates/paimon/src/spec/schema.rs | 463 ++++++++++++++++++++++++++++++++++++++-
 crates/paimon/src/spec/types.rs  |  74 ++++++-
 2 files changed, 533 insertions(+), 4 deletions(-)

diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 4266421..1729ea3 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::spec::types::DataType;
+use crate::spec::types::{DataType, RowType};
 use serde::{Deserialize, Serialize};
 use serde_with::serde_as;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 /// The table schema for paimon table.
 ///
@@ -102,6 +102,344 @@ pub fn escape_single_quotes(text: &str) -> String {
     text.replace('\'', "''")
 }
 
+// ======================= Schema (DDL) ===============================
+
+/// Option key for primary key in table options (same as 
[CoreOptions.PRIMARY_KEY](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
+pub const PRIMARY_KEY_OPTION: &str = "primary-key";
+/// Option key for partition in table options (same as 
[CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
+pub const PARTITION_OPTION: &str = "partition";
+
+/// Schema of a table (logical DDL schema).
+///
+/// Corresponds to 
[org.apache.paimon.schema.Schema](https://github.com/apache/paimon/blob/1.3/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java).
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Schema {
+    fields: Vec<DataField>,
+    partition_keys: Vec<String>,
+    primary_keys: Vec<String>,
+    options: HashMap<String, String>,
+    comment: Option<String>,
+}
+
+impl Schema {
+    /// Build a schema with validation. Normalizes partition/primary keys from 
options if present.
+    pub fn new(
+        fields: Vec<DataField>,
+        partition_keys: Vec<String>,
+        primary_keys: Vec<String>,
+        mut options: HashMap<String, String>,
+        comment: Option<String>,
+    ) -> crate::Result<Self> {
+        let primary_keys = Self::normalize_primary_keys(&primary_keys, &mut 
options)?;
+        let partition_keys = Self::normalize_partition_keys(&partition_keys, 
&mut options)?;
+        let fields = Self::normalize_fields(&fields, &partition_keys, 
&primary_keys)?;
+
+        Ok(Self {
+            fields,
+            partition_keys,
+            primary_keys,
+            options,
+            comment,
+        })
+    }
+
+    /// Normalize primary keys: optionally take from table options 
(`primary-key`), remove from options.
+    /// Corresponds to Java `normalizePrimaryKeys`.
+    fn normalize_primary_keys(
+        primary_keys: &[String],
+        options: &mut HashMap<String, String>,
+    ) -> crate::Result<Vec<String>> {
+        if let Some(pk) = options.remove(PRIMARY_KEY_OPTION) {
+            if !primary_keys.is_empty() {
+                return Err(crate::Error::ConfigInvalid {
+                    message: "Cannot define primary key on DDL and table 
options at the same time."
+                        .to_string(),
+                });
+            }
+            return Ok(pk
+                .split(',')
+                .map(|s| s.trim().to_string())
+                .filter(|s| !s.is_empty())
+                .collect());
+        }
+        Ok(primary_keys.to_vec())
+    }
+
+    /// Normalize partition keys: optionally take from table options 
(`partition`), remove from options.
+    /// Corresponds to Java `normalizePartitionKeys`.
+    fn normalize_partition_keys(
+        partition_keys: &[String],
+        options: &mut HashMap<String, String>,
+    ) -> crate::Result<Vec<String>> {
+        if let Some(part) = options.remove(PARTITION_OPTION) {
+            if !partition_keys.is_empty() {
+                return Err(crate::Error::ConfigInvalid {
+                    message: "Cannot define partition on DDL and table options 
at the same time."
+                        .to_string(),
+                });
+            }
+            return Ok(part
+                .split(',')
+                .map(|s| s.trim().to_string())
+                .filter(|s| !s.is_empty())
+                .collect());
+        }
+        Ok(partition_keys.to_vec())
+    }
+
+    /// Normalize fields: validate (duplicate/subset checks) and make primary 
key columns non-nullable.
+    /// Corresponds to Java `normalizeFields`.
+    fn normalize_fields(
+        fields: &[DataField],
+        partition_keys: &[String],
+        primary_keys: &[String],
+    ) -> crate::Result<Vec<DataField>> {
+        let field_names: Vec<String> = fields.iter().map(|f| 
f.name().to_string()).collect();
+        Self::validate_no_duplicate_fields(&field_names)?;
+        Self::validate_partition_keys(&field_names, partition_keys)?;
+        Self::validate_primary_keys(&field_names, primary_keys)?;
+
+        if primary_keys.is_empty() {
+            return Ok(fields.to_vec());
+        }
+
+        let pk_set: HashSet<&str> = 
primary_keys.iter().map(String::as_str).collect();
+        let mut new_fields = Vec::with_capacity(fields.len());
+        for f in fields {
+            if pk_set.contains(f.name()) && f.data_type().is_nullable() {
+                new_fields.push(
+                    DataField::new(
+                        f.id(),
+                        f.name().to_string(),
+                        f.data_type().copy_with_nullable(false)?,
+                    )
+                    .with_description(f.description().map(|s| s.to_string())),
+                );
+            } else {
+                new_fields.push(f.clone());
+            }
+        }
+        Ok(new_fields)
+    }
+
+    /// Table columns must not contain duplicate field names.
+    fn validate_no_duplicate_fields(field_names: &[String]) -> 
crate::Result<()> {
+        let duplicates = Self::duplicate_fields(field_names);
+        if duplicates.is_empty() {
+            Ok(())
+        } else {
+            Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Table column {field_names:?} must not contain duplicate 
fields. Found: {duplicates:?}"
+                ),
+            })
+        }
+    }
+
+    /// Partition key constraint must not contain duplicates; all partition 
keys must be in table columns.
+    fn validate_partition_keys(
+        field_names: &[String],
+        partition_keys: &[String],
+    ) -> crate::Result<()> {
+        let all_names: HashSet<&str> = 
field_names.iter().map(String::as_str).collect();
+        let duplicates = Self::duplicate_fields(partition_keys);
+        if !duplicates.is_empty() {
+            return Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Partition key constraint {partition_keys:?} must not 
contain duplicate columns. Found: {duplicates:?}"
+                ),
+            });
+        }
+        if !partition_keys
+            .iter()
+            .all(|k| all_names.contains(k.as_str()))
+        {
+            return Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Table column {field_names:?} should include all partition 
fields {partition_keys:?}"
+                ),
+            });
+        }
+        Ok(())
+    }
+
+    /// Primary key constraint must not contain duplicates; all primary keys 
must be in table columns.
+    fn validate_primary_keys(field_names: &[String], primary_keys: &[String]) 
-> crate::Result<()> {
+        if primary_keys.is_empty() {
+            return Ok(());
+        }
+        let all_names: HashSet<&str> = 
field_names.iter().map(String::as_str).collect();
+        let duplicates = Self::duplicate_fields(primary_keys);
+        if !duplicates.is_empty() {
+            return Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Primary key constraint {primary_keys:?} must not contain 
duplicate columns. Found: {duplicates:?}"
+                ),
+            });
+        }
+        if !primary_keys.iter().all(|k| all_names.contains(k.as_str())) {
+            return Err(crate::Error::ConfigInvalid {
+                message: format!(
+                    "Table column {field_names:?} should include all primary 
key constraint {primary_keys:?}"
+                ),
+            });
+        }
+        Ok(())
+    }
+
+    /// Returns the set of names that appear more than once.
+    pub fn duplicate_fields(names: &[String]) -> HashSet<String> {
+        let mut seen = HashMap::new();
+        for n in names {
+            *seen.entry(n.clone()).or_insert(0) += 1;
+        }
+        seen.into_iter()
+            .filter(|(_, count)| *count > 1)
+            .map(|(name, _)| name)
+            .collect()
+    }
+
+    /// Row type with these fields (nullable = false for table row).
+    pub fn row_type(&self) -> RowType {
+        RowType::with_nullable(false, self.fields.clone())
+    }
+
+    pub fn fields(&self) -> &[DataField] {
+        &self.fields
+    }
+
+    pub fn partition_keys(&self) -> &[String] {
+        &self.partition_keys
+    }
+
+    pub fn primary_keys(&self) -> &[String] {
+        &self.primary_keys
+    }
+
+    pub fn options(&self) -> &HashMap<String, String> {
+        &self.options
+    }
+
+    pub fn comment(&self) -> Option<&str> {
+        self.comment.as_deref()
+    }
+
+    /// Create a new schema with the same keys/options/comment but different 
row type
+    pub fn copy(&self, row_type: RowType) -> crate::Result<Self> {
+        Self::new(
+            row_type.fields().to_vec(),
+            self.partition_keys.clone(),
+            self.primary_keys.clone(),
+            self.options.clone(),
+            self.comment.clone(),
+        )
+    }
+
+    /// Create a new builder for configuring a schema.
+    pub fn builder() -> SchemaBuilder {
+        SchemaBuilder::new()
+    }
+}
+
+/// Builder for [`Schema`].
+pub struct SchemaBuilder {
+    columns: Vec<DataField>,
+    partition_keys: Vec<String>,
+    primary_keys: Vec<String>,
+    options: HashMap<String, String>,
+    comment: Option<String>,
+    next_field_id: i32,
+}
+
+impl SchemaBuilder {
+    pub fn new() -> Self {
+        Self {
+            columns: Vec::new(),
+            partition_keys: Vec::new(),
+            primary_keys: Vec::new(),
+            options: HashMap::new(),
+            comment: None,
+            next_field_id: 0,
+        }
+    }
+
+    /// Add a column (name, data type).
+    pub fn column(self, column_name: impl Into<String>, data_type: DataType) 
-> Self {
+        self.column_with_description(column_name, data_type, None)
+    }
+
+    /// Add a column with optional description.
+    ///
+    /// TODO: Support RowType in schema columns with field ID assignment for 
nested fields.
+    /// See <https://github.com/apache/paimon/pull/1547>.
+    pub fn column_with_description(
+        mut self,
+        column_name: impl Into<String>,
+        data_type: DataType,
+        description: Option<String>,
+    ) -> Self {
+        if data_type.contains_row_type() {
+            todo!(
+                "Column type containing RowType is not supported yet: field ID 
assignment for nested row fields is not implemented. See 
https://github.com/apache/paimon/pull/1547";
+            );
+        }
+        let name = column_name.into();
+        let id = self.next_field_id;
+        self.next_field_id += 1;
+        self.columns
+            .push(DataField::new(id, name, 
data_type).with_description(description));
+        self
+    }
+
+    /// Set partition keys.
+    pub fn partition_keys(mut self, names: impl IntoIterator<Item = impl 
Into<String>>) -> Self {
+        self.partition_keys = names.into_iter().map(Into::into).collect();
+        self
+    }
+
+    /// Set primary key columns. They must not be nullable.
+    pub fn primary_key(mut self, names: impl IntoIterator<Item = impl 
Into<String>>) -> Self {
+        self.primary_keys = names.into_iter().map(Into::into).collect();
+        self
+    }
+
+    /// Set table options (merged with existing).
+    pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>) 
-> Self {
+        self.options.extend(opts);
+        self
+    }
+
+    /// Set a single option.
+    pub fn option(mut self, key: impl Into<String>, value: impl Into<String>) 
-> Self {
+        self.options.insert(key.into(), value.into());
+        self
+    }
+
+    /// Set table comment.
+    pub fn comment(mut self, comment: Option<String>) -> Self {
+        self.comment = comment;
+        self
+    }
+
+    /// Build the schema (validates and normalizes).
+    pub fn build(self) -> crate::Result<Schema> {
+        Schema::new(
+            self.columns,
+            self.partition_keys,
+            self.primary_keys,
+            self.options,
+            self.comment,
+        )
+    }
+}
+
+impl Default for SchemaBuilder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::spec::IntType;
@@ -170,4 +508,125 @@ mod tests {
         let escaped_text = escape_single_quotes("text with 'single' quotes");
         assert_eq!(escaped_text, "text with ''single'' quotes");
     }
+
+    #[test]
+    fn test_schema_builder_build() {
+        let schema = Schema::builder()
+            .column("id", DataType::Int(IntType::with_nullable(true)))
+            .column("name", DataType::Int(IntType::new()))
+            .primary_key(["id"])
+            .option("k", "v")
+            .comment(Some("table comment".into()))
+            .build()
+            .unwrap();
+        assert_eq!(schema.fields().len(), 2);
+        assert_eq!(schema.primary_keys(), &["id"]);
+        assert_eq!(schema.options().get("k"), Some(&"v".to_string()));
+        assert_eq!(schema.comment(), Some("table comment"));
+        let id_field = schema.fields().iter().find(|f| f.name() == 
"id").unwrap();
+        assert!(
+            !id_field.data_type().is_nullable(),
+            "primary key column should be normalized to NOT NULL"
+        );
+    }
+
+    #[test]
+    fn test_schema_validation() {
+        // Duplicate field names
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::new()))
+            .column("b", DataType::Int(IntType::new()))
+            .column("a", DataType::Int(IntType::new()))
+            .build();
+        assert!(res.is_err(), "duplicate field names should be rejected");
+
+        // Duplicate partition keys
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::new()))
+            .column("b", DataType::Int(IntType::new()))
+            .partition_keys(["a", "a"])
+            .build();
+        assert!(res.is_err(), "duplicate partition keys should be rejected");
+
+        // Partition key not in fields
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::new()))
+            .column("b", DataType::Int(IntType::new()))
+            .partition_keys(["c"])
+            .build();
+        assert!(
+            res.is_err(),
+            "partition key not in columns should be rejected"
+        );
+
+        // Duplicate primary keys
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::with_nullable(false)))
+            .column("b", DataType::Int(IntType::new()))
+            .primary_key(["a", "a"])
+            .build();
+        assert!(res.is_err(), "duplicate primary keys should be rejected");
+
+        // Primary key not in fields
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::with_nullable(false)))
+            .column("b", DataType::Int(IntType::new()))
+            .primary_key(["c"])
+            .build();
+        assert!(
+            res.is_err(),
+            "primary key not in columns should be rejected"
+        );
+
+        // primary-key in options and DDL at same time
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::with_nullable(false)))
+            .column("b", DataType::Int(IntType::new()))
+            .primary_key(["a"])
+            .option(PRIMARY_KEY_OPTION, "a")
+            .build();
+        assert!(
+            res.is_err(),
+            "primary key defined in both DDL and options should be rejected"
+        );
+
+        // partition in options and DDL at same time
+        let res = Schema::builder()
+            .column("a", DataType::Int(IntType::new()))
+            .column("b", DataType::Int(IntType::new()))
+            .partition_keys(["a"])
+            .option(PARTITION_OPTION, "a")
+            .build();
+        assert!(
+            res.is_err(),
+            "partition defined in both DDL and options should be rejected"
+        );
+
+        // Valid: partition keys and primary key subset of fields
+        let schema = Schema::builder()
+            .column("a", DataType::Int(IntType::with_nullable(false)))
+            .column("b", DataType::Int(IntType::new()))
+            .column("c", DataType::Int(IntType::new()))
+            .partition_keys(["a"])
+            .primary_key(["a", "b"])
+            .build()
+            .unwrap();
+        assert_eq!(schema.partition_keys(), &["a"]);
+        assert_eq!(schema.primary_keys(), &["a", "b"]);
+    }
+
+    /// Adding a column whose type is or contains RowType panics (todo! until 
field ID assignment for nested row fields).
+    /// See <https://github.com/apache/paimon/pull/1547>.
+    #[test]
+    #[should_panic(expected = "RowType")]
+    fn test_schema_builder_column_row_type_panics() {
+        let row_type = RowType::new(vec![DataField::new(
+            0,
+            "nested".into(),
+            DataType::Int(IntType::new()),
+        )]);
+        Schema::builder()
+            .column("id", DataType::Int(IntType::new()))
+            .column("payload", DataType::Row(row_type));
+    }
 }
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index ed7f4e8..815c7c6 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -103,9 +103,22 @@ pub enum DataType {
     Row(RowType),
 }
 
-#[allow(dead_code)]
 impl DataType {
-    fn is_nullable(&self) -> bool {
+    /// Returns whether this type is or contains (recursively) a [`RowType`].
+    /// Used to reject schema columns that would require field ID assignment 
for nested row fields,
+    /// which is not yet implemented (see 
<https://github.com/apache/paimon/pull/1547>).
+    pub fn contains_row_type(&self) -> bool {
+        match self {
+            DataType::Row(_) => true,
+            DataType::Array(v) => v.element_type.contains_row_type(),
+            DataType::Map(v) => v.key_type.contains_row_type() || 
v.value_type.contains_row_type(),
+            DataType::Multiset(v) => v.element_type.contains_row_type(),
+            _ => false,
+        }
+    }
+
+    /// Returns whether this type is nullable.
+    pub fn is_nullable(&self) -> bool {
         match self {
             DataType::Boolean(v) => v.nullable,
             DataType::TinyInt(v) => v.nullable,
@@ -129,6 +142,59 @@ impl DataType {
             DataType::Row(v) => v.nullable,
         }
     }
+
+    /// Returns a copy of this type with the given nullability (top-level 
only).
+    /// Corresponds to Java `DataType.copy(boolean nullable)`.
+    pub fn copy_with_nullable(&self, nullable: bool) -> Result<Self> {
+        Ok(match self {
+            DataType::Boolean(_) => 
DataType::Boolean(BooleanType::with_nullable(nullable)),
+            DataType::TinyInt(_) => 
DataType::TinyInt(TinyIntType::with_nullable(nullable)),
+            DataType::SmallInt(_) => 
DataType::SmallInt(SmallIntType::with_nullable(nullable)),
+            DataType::Int(_) => 
DataType::Int(IntType::with_nullable(nullable)),
+            DataType::BigInt(_) => 
DataType::BigInt(BigIntType::with_nullable(nullable)),
+            DataType::Decimal(v) => 
DataType::Decimal(DecimalType::with_nullable(
+                nullable,
+                v.precision(),
+                v.scale(),
+            )?),
+            DataType::Double(_) => 
DataType::Double(DoubleType::with_nullable(nullable)),
+            DataType::Float(_) => 
DataType::Float(FloatType::with_nullable(nullable)),
+            DataType::Binary(v) => {
+                DataType::Binary(BinaryType::with_nullable(nullable, 
v.length())?)
+            }
+            DataType::VarBinary(v) => {
+                DataType::VarBinary(VarBinaryType::try_new(nullable, 
v.length())?)
+            }
+            DataType::Char(v) => 
DataType::Char(CharType::with_nullable(nullable, v.length())?),
+            DataType::VarChar(v) => {
+                DataType::VarChar(VarCharType::with_nullable(nullable, 
v.length())?)
+            }
+            DataType::Date(_) => 
DataType::Date(DateType::with_nullable(nullable)),
+            DataType::LocalZonedTimestamp(v) => DataType::LocalZonedTimestamp(
+                LocalZonedTimestampType::with_nullable(nullable, 
v.precision())?,
+            ),
+            DataType::Time(v) => 
DataType::Time(TimeType::with_nullable(nullable, v.precision())?),
+            DataType::Timestamp(v) => {
+                DataType::Timestamp(TimestampType::with_nullable(nullable, 
v.precision())?)
+            }
+            DataType::Array(v) => DataType::Array(ArrayType::with_nullable(
+                nullable,
+                v.element_type.as_ref().clone(),
+            )),
+            DataType::Map(v) => DataType::Map(MapType::with_nullable(
+                nullable,
+                v.key_type.as_ref().clone(),
+                v.value_type.as_ref().clone(),
+            )),
+            DataType::Multiset(v) => 
DataType::Multiset(MultisetType::with_nullable(
+                nullable,
+                v.element_type.as_ref().clone(),
+            )),
+            DataType::Row(v) => {
+                DataType::Row(RowType::with_nullable(nullable, 
v.fields().to_vec()))
+            }
+        })
+    }
 }
 
 /// ArrayType for paimon.
@@ -1329,6 +1395,10 @@ impl RowType {
     pub fn family(&self) -> DataTypeFamily {
         DataTypeFamily::CONSTRUCTED
     }
+
+    pub fn fields(&self) -> &[DataField] {
+        &self.fields
+    }
 }
 
 mod serde_utils {

Reply via email to