This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3296926 feat: introduce schema (#90)
3296926 is described below
commit 32969261ec4ef117f40ebe03ba3b24aa8feb3888
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Feb 26 12:26:16 2026 +0800
feat: introduce schema (#90)
---
crates/paimon/src/spec/schema.rs | 463 ++++++++++++++++++++++++++++++++++++++-
crates/paimon/src/spec/types.rs | 74 ++++++-
2 files changed, 533 insertions(+), 4 deletions(-)
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 4266421..1729ea3 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::spec::types::DataType;
+use crate::spec::types::{DataType, RowType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
/// The table schema for paimon table.
///
@@ -102,6 +102,344 @@ pub fn escape_single_quotes(text: &str) -> String {
text.replace('\'', "''")
}
+// ======================= Schema (DDL) ===============================
+
+/// Option key for primary key in table options (same as
[CoreOptions.PRIMARY_KEY](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
+pub const PRIMARY_KEY_OPTION: &str = "primary-key";
+/// Option key for partition in table options (same as
[CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
+pub const PARTITION_OPTION: &str = "partition";
+
+/// Schema of a table (logical DDL schema).
+///
+/// Corresponds to
[org.apache.paimon.schema.Schema](https://github.com/apache/paimon/blob/1.3/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java).
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Schema {
+ fields: Vec<DataField>,
+ partition_keys: Vec<String>,
+ primary_keys: Vec<String>,
+ options: HashMap<String, String>,
+ comment: Option<String>,
+}
+
+impl Schema {
+ /// Build a schema with validation. Normalizes partition/primary keys from
options if present.
+ pub fn new(
+ fields: Vec<DataField>,
+ partition_keys: Vec<String>,
+ primary_keys: Vec<String>,
+ mut options: HashMap<String, String>,
+ comment: Option<String>,
+ ) -> crate::Result<Self> {
+ let primary_keys = Self::normalize_primary_keys(&primary_keys, &mut
options)?;
+ let partition_keys = Self::normalize_partition_keys(&partition_keys,
&mut options)?;
+ let fields = Self::normalize_fields(&fields, &partition_keys,
&primary_keys)?;
+
+ Ok(Self {
+ fields,
+ partition_keys,
+ primary_keys,
+ options,
+ comment,
+ })
+ }
+
+ /// Normalize primary keys: optionally take from table options
(`primary-key`), remove from options.
+ /// Corresponds to Java `normalizePrimaryKeys`.
+ fn normalize_primary_keys(
+ primary_keys: &[String],
+ options: &mut HashMap<String, String>,
+ ) -> crate::Result<Vec<String>> {
+ if let Some(pk) = options.remove(PRIMARY_KEY_OPTION) {
+ if !primary_keys.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: "Cannot define primary key on DDL and table
options at the same time."
+ .to_string(),
+ });
+ }
+ return Ok(pk
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect());
+ }
+ Ok(primary_keys.to_vec())
+ }
+
+ /// Normalize partition keys: optionally take from table options
(`partition`), remove from options.
+ /// Corresponds to Java `normalizePartitionKeys`.
+ fn normalize_partition_keys(
+ partition_keys: &[String],
+ options: &mut HashMap<String, String>,
+ ) -> crate::Result<Vec<String>> {
+ if let Some(part) = options.remove(PARTITION_OPTION) {
+ if !partition_keys.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: "Cannot define partition on DDL and table options
at the same time."
+ .to_string(),
+ });
+ }
+ return Ok(part
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect());
+ }
+ Ok(partition_keys.to_vec())
+ }
+
+ /// Normalize fields: validate (duplicate/subset checks) and make primary
key columns non-nullable.
+ /// Corresponds to Java `normalizeFields`.
+ fn normalize_fields(
+ fields: &[DataField],
+ partition_keys: &[String],
+ primary_keys: &[String],
+ ) -> crate::Result<Vec<DataField>> {
+ let field_names: Vec<String> = fields.iter().map(|f|
f.name().to_string()).collect();
+ Self::validate_no_duplicate_fields(&field_names)?;
+ Self::validate_partition_keys(&field_names, partition_keys)?;
+ Self::validate_primary_keys(&field_names, primary_keys)?;
+
+ if primary_keys.is_empty() {
+ return Ok(fields.to_vec());
+ }
+
+ let pk_set: HashSet<&str> =
primary_keys.iter().map(String::as_str).collect();
+ let mut new_fields = Vec::with_capacity(fields.len());
+ for f in fields {
+ if pk_set.contains(f.name()) && f.data_type().is_nullable() {
+ new_fields.push(
+ DataField::new(
+ f.id(),
+ f.name().to_string(),
+ f.data_type().copy_with_nullable(false)?,
+ )
+ .with_description(f.description().map(|s| s.to_string())),
+ );
+ } else {
+ new_fields.push(f.clone());
+ }
+ }
+ Ok(new_fields)
+ }
+
+ /// Table columns must not contain duplicate field names.
+ fn validate_no_duplicate_fields(field_names: &[String]) ->
crate::Result<()> {
+ let duplicates = Self::duplicate_fields(field_names);
+ if duplicates.is_empty() {
+ Ok(())
+ } else {
+ Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "Table column {field_names:?} must not contain duplicate
fields. Found: {duplicates:?}"
+ ),
+ })
+ }
+ }
+
+ /// Partition key constraint must not contain duplicates; all partition
keys must be in table columns.
+ fn validate_partition_keys(
+ field_names: &[String],
+ partition_keys: &[String],
+ ) -> crate::Result<()> {
+ let all_names: HashSet<&str> =
field_names.iter().map(String::as_str).collect();
+ let duplicates = Self::duplicate_fields(partition_keys);
+ if !duplicates.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "Partition key constraint {partition_keys:?} must not
contain duplicate columns. Found: {duplicates:?}"
+ ),
+ });
+ }
+ if !partition_keys
+ .iter()
+ .all(|k| all_names.contains(k.as_str()))
+ {
+ return Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "Table column {field_names:?} should include all partition
fields {partition_keys:?}"
+ ),
+ });
+ }
+ Ok(())
+ }
+
+ /// Primary key constraint must not contain duplicates; all primary keys
must be in table columns.
+ fn validate_primary_keys(field_names: &[String], primary_keys: &[String])
-> crate::Result<()> {
+ if primary_keys.is_empty() {
+ return Ok(());
+ }
+ let all_names: HashSet<&str> =
field_names.iter().map(String::as_str).collect();
+ let duplicates = Self::duplicate_fields(primary_keys);
+ if !duplicates.is_empty() {
+ return Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "Primary key constraint {primary_keys:?} must not contain
duplicate columns. Found: {duplicates:?}"
+ ),
+ });
+ }
+ if !primary_keys.iter().all(|k| all_names.contains(k.as_str())) {
+ return Err(crate::Error::ConfigInvalid {
+ message: format!(
+ "Table column {field_names:?} should include all primary
key constraint {primary_keys:?}"
+ ),
+ });
+ }
+ Ok(())
+ }
+
+ /// Returns the set of names that appear more than once.
+ pub fn duplicate_fields(names: &[String]) -> HashSet<String> {
+ let mut seen = HashMap::new();
+ for n in names {
+ *seen.entry(n.clone()).or_insert(0) += 1;
+ }
+ seen.into_iter()
+ .filter(|(_, count)| *count > 1)
+ .map(|(name, _)| name)
+ .collect()
+ }
+
+ /// Row type with these fields (nullable = false for table row).
+ pub fn row_type(&self) -> RowType {
+ RowType::with_nullable(false, self.fields.clone())
+ }
+
+ pub fn fields(&self) -> &[DataField] {
+ &self.fields
+ }
+
+ pub fn partition_keys(&self) -> &[String] {
+ &self.partition_keys
+ }
+
+ pub fn primary_keys(&self) -> &[String] {
+ &self.primary_keys
+ }
+
+ pub fn options(&self) -> &HashMap<String, String> {
+ &self.options
+ }
+
+ pub fn comment(&self) -> Option<&str> {
+ self.comment.as_deref()
+ }
+
+ /// Create a new schema with the same keys/options/comment but different
row type
+ pub fn copy(&self, row_type: RowType) -> crate::Result<Self> {
+ Self::new(
+ row_type.fields().to_vec(),
+ self.partition_keys.clone(),
+ self.primary_keys.clone(),
+ self.options.clone(),
+ self.comment.clone(),
+ )
+ }
+
+ /// Create a new builder for configuring a schema.
+ pub fn builder() -> SchemaBuilder {
+ SchemaBuilder::new()
+ }
+}
+
+/// Builder for [`Schema`].
+pub struct SchemaBuilder {
+ columns: Vec<DataField>,
+ partition_keys: Vec<String>,
+ primary_keys: Vec<String>,
+ options: HashMap<String, String>,
+ comment: Option<String>,
+ next_field_id: i32,
+}
+
+impl SchemaBuilder {
+ pub fn new() -> Self {
+ Self {
+ columns: Vec::new(),
+ partition_keys: Vec::new(),
+ primary_keys: Vec::new(),
+ options: HashMap::new(),
+ comment: None,
+ next_field_id: 0,
+ }
+ }
+
+ /// Add a column (name, data type).
+ pub fn column(self, column_name: impl Into<String>, data_type: DataType)
-> Self {
+ self.column_with_description(column_name, data_type, None)
+ }
+
+ /// Add a column with optional description.
+ ///
+ /// TODO: Support RowType in schema columns with field ID assignment for
nested fields.
+ /// See <https://github.com/apache/paimon/pull/1547>.
+ pub fn column_with_description(
+ mut self,
+ column_name: impl Into<String>,
+ data_type: DataType,
+ description: Option<String>,
+ ) -> Self {
+ if data_type.contains_row_type() {
+ todo!(
+ "Column type containing RowType is not supported yet: field ID
assignment for nested row fields is not implemented. See
https://github.com/apache/paimon/pull/1547"
+ );
+ }
+ let name = column_name.into();
+ let id = self.next_field_id;
+ self.next_field_id += 1;
+ self.columns
+ .push(DataField::new(id, name,
data_type).with_description(description));
+ self
+ }
+
+ /// Set partition keys.
+ pub fn partition_keys(mut self, names: impl IntoIterator<Item = impl
Into<String>>) -> Self {
+ self.partition_keys = names.into_iter().map(Into::into).collect();
+ self
+ }
+
+ /// Set primary key columns. They must not be nullable.
+ pub fn primary_key(mut self, names: impl IntoIterator<Item = impl
Into<String>>) -> Self {
+ self.primary_keys = names.into_iter().map(Into::into).collect();
+ self
+ }
+
+ /// Set table options (merged with existing).
+ pub fn options(mut self, opts: impl IntoIterator<Item = (String, String)>)
-> Self {
+ self.options.extend(opts);
+ self
+ }
+
+ /// Set a single option.
+ pub fn option(mut self, key: impl Into<String>, value: impl Into<String>)
-> Self {
+ self.options.insert(key.into(), value.into());
+ self
+ }
+
+ /// Set table comment.
+ pub fn comment(mut self, comment: Option<String>) -> Self {
+ self.comment = comment;
+ self
+ }
+
+ /// Build the schema (validates and normalizes).
+ pub fn build(self) -> crate::Result<Schema> {
+ Schema::new(
+ self.columns,
+ self.partition_keys,
+ self.primary_keys,
+ self.options,
+ self.comment,
+ )
+ }
+}
+
+impl Default for SchemaBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::spec::IntType;
@@ -170,4 +508,125 @@ mod tests {
let escaped_text = escape_single_quotes("text with 'single' quotes");
assert_eq!(escaped_text, "text with ''single'' quotes");
}
+
+ #[test]
+ fn test_schema_builder_build() {
+ let schema = Schema::builder()
+ .column("id", DataType::Int(IntType::with_nullable(true)))
+ .column("name", DataType::Int(IntType::new()))
+ .primary_key(["id"])
+ .option("k", "v")
+ .comment(Some("table comment".into()))
+ .build()
+ .unwrap();
+ assert_eq!(schema.fields().len(), 2);
+ assert_eq!(schema.primary_keys(), &["id"]);
+ assert_eq!(schema.options().get("k"), Some(&"v".to_string()));
+ assert_eq!(schema.comment(), Some("table comment"));
+ let id_field = schema.fields().iter().find(|f| f.name() ==
"id").unwrap();
+ assert!(
+ !id_field.data_type().is_nullable(),
+ "primary key column should be normalized to NOT NULL"
+ );
+ }
+
+ #[test]
+ fn test_schema_validation() {
+ // Duplicate field names
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::new()))
+ .column("b", DataType::Int(IntType::new()))
+ .column("a", DataType::Int(IntType::new()))
+ .build();
+ assert!(res.is_err(), "duplicate field names should be rejected");
+
+ // Duplicate partition keys
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::new()))
+ .column("b", DataType::Int(IntType::new()))
+ .partition_keys(["a", "a"])
+ .build();
+ assert!(res.is_err(), "duplicate partition keys should be rejected");
+
+ // Partition key not in fields
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::new()))
+ .column("b", DataType::Int(IntType::new()))
+ .partition_keys(["c"])
+ .build();
+ assert!(
+ res.is_err(),
+ "partition key not in columns should be rejected"
+ );
+
+ // Duplicate primary keys
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::with_nullable(false)))
+ .column("b", DataType::Int(IntType::new()))
+ .primary_key(["a", "a"])
+ .build();
+ assert!(res.is_err(), "duplicate primary keys should be rejected");
+
+ // Primary key not in fields
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::with_nullable(false)))
+ .column("b", DataType::Int(IntType::new()))
+ .primary_key(["c"])
+ .build();
+ assert!(
+ res.is_err(),
+ "primary key not in columns should be rejected"
+ );
+
+ // primary-key in options and DDL at same time
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::with_nullable(false)))
+ .column("b", DataType::Int(IntType::new()))
+ .primary_key(["a"])
+ .option(PRIMARY_KEY_OPTION, "a")
+ .build();
+ assert!(
+ res.is_err(),
+ "primary key defined in both DDL and options should be rejected"
+ );
+
+ // partition in options and DDL at same time
+ let res = Schema::builder()
+ .column("a", DataType::Int(IntType::new()))
+ .column("b", DataType::Int(IntType::new()))
+ .partition_keys(["a"])
+ .option(PARTITION_OPTION, "a")
+ .build();
+ assert!(
+ res.is_err(),
+ "partition defined in both DDL and options should be rejected"
+ );
+
+ // Valid: partition keys and primary key subset of fields
+ let schema = Schema::builder()
+ .column("a", DataType::Int(IntType::with_nullable(false)))
+ .column("b", DataType::Int(IntType::new()))
+ .column("c", DataType::Int(IntType::new()))
+ .partition_keys(["a"])
+ .primary_key(["a", "b"])
+ .build()
+ .unwrap();
+ assert_eq!(schema.partition_keys(), &["a"]);
+ assert_eq!(schema.primary_keys(), &["a", "b"]);
+ }
+
+ /// Adding a column whose type is or contains RowType panics (todo! until
field ID assignment for nested row fields).
+ /// See <https://github.com/apache/paimon/pull/1547>.
+ #[test]
+ #[should_panic(expected = "RowType")]
+ fn test_schema_builder_column_row_type_panics() {
+ let row_type = RowType::new(vec![DataField::new(
+ 0,
+ "nested".into(),
+ DataType::Int(IntType::new()),
+ )]);
+ Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("payload", DataType::Row(row_type));
+ }
}
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index ed7f4e8..815c7c6 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -103,9 +103,22 @@ pub enum DataType {
Row(RowType),
}
-#[allow(dead_code)]
impl DataType {
- fn is_nullable(&self) -> bool {
+ /// Returns whether this type is or contains (recursively) a [`RowType`].
+ /// Used to reject schema columns that would require field ID assignment
for nested row fields,
+ /// which is not yet implemented (see
<https://github.com/apache/paimon/pull/1547>).
+ pub fn contains_row_type(&self) -> bool {
+ match self {
+ DataType::Row(_) => true,
+ DataType::Array(v) => v.element_type.contains_row_type(),
+ DataType::Map(v) => v.key_type.contains_row_type() ||
v.value_type.contains_row_type(),
+ DataType::Multiset(v) => v.element_type.contains_row_type(),
+ _ => false,
+ }
+ }
+
+ /// Returns whether this type is nullable.
+ pub fn is_nullable(&self) -> bool {
match self {
DataType::Boolean(v) => v.nullable,
DataType::TinyInt(v) => v.nullable,
@@ -129,6 +142,59 @@ impl DataType {
DataType::Row(v) => v.nullable,
}
}
+
+ /// Returns a copy of this type with the given nullability (top-level
only).
+ /// Corresponds to Java `DataType.copy(boolean nullable)`.
+ pub fn copy_with_nullable(&self, nullable: bool) -> Result<Self> {
+ Ok(match self {
+ DataType::Boolean(_) =>
DataType::Boolean(BooleanType::with_nullable(nullable)),
+ DataType::TinyInt(_) =>
DataType::TinyInt(TinyIntType::with_nullable(nullable)),
+ DataType::SmallInt(_) =>
DataType::SmallInt(SmallIntType::with_nullable(nullable)),
+ DataType::Int(_) =>
DataType::Int(IntType::with_nullable(nullable)),
+ DataType::BigInt(_) =>
DataType::BigInt(BigIntType::with_nullable(nullable)),
+ DataType::Decimal(v) =>
DataType::Decimal(DecimalType::with_nullable(
+ nullable,
+ v.precision(),
+ v.scale(),
+ )?),
+ DataType::Double(_) =>
DataType::Double(DoubleType::with_nullable(nullable)),
+ DataType::Float(_) =>
DataType::Float(FloatType::with_nullable(nullable)),
+ DataType::Binary(v) => {
+ DataType::Binary(BinaryType::with_nullable(nullable,
v.length())?)
+ }
+ DataType::VarBinary(v) => {
+ DataType::VarBinary(VarBinaryType::try_new(nullable,
v.length())?)
+ }
+ DataType::Char(v) =>
DataType::Char(CharType::with_nullable(nullable, v.length())?),
+ DataType::VarChar(v) => {
+ DataType::VarChar(VarCharType::with_nullable(nullable,
v.length())?)
+ }
+ DataType::Date(_) =>
DataType::Date(DateType::with_nullable(nullable)),
+ DataType::LocalZonedTimestamp(v) => DataType::LocalZonedTimestamp(
+ LocalZonedTimestampType::with_nullable(nullable,
v.precision())?,
+ ),
+ DataType::Time(v) =>
DataType::Time(TimeType::with_nullable(nullable, v.precision())?),
+ DataType::Timestamp(v) => {
+ DataType::Timestamp(TimestampType::with_nullable(nullable,
v.precision())?)
+ }
+ DataType::Array(v) => DataType::Array(ArrayType::with_nullable(
+ nullable,
+ v.element_type.as_ref().clone(),
+ )),
+ DataType::Map(v) => DataType::Map(MapType::with_nullable(
+ nullable,
+ v.key_type.as_ref().clone(),
+ v.value_type.as_ref().clone(),
+ )),
+ DataType::Multiset(v) =>
DataType::Multiset(MultisetType::with_nullable(
+ nullable,
+ v.element_type.as_ref().clone(),
+ )),
+ DataType::Row(v) => {
+ DataType::Row(RowType::with_nullable(nullable,
v.fields().to_vec()))
+ }
+ })
+ }
}
/// ArrayType for paimon.
@@ -1329,6 +1395,10 @@ impl RowType {
pub fn family(&self) -> DataTypeFamily {
DataTypeFamily::CONSTRUCTED
}
+
+ pub fn fields(&self) -> &[DataField] {
+ &self.fields
+ }
}
mod serde_utils {