This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7b3c89f feat(datafusion): Add DDL support with PRIMARY KEY constraint
syntax (#237)
7b3c89f is described below
commit 7b3c89fdde126d932ec2a716b726f50c8e4af719
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Apr 12 19:55:06 2026 +0800
feat(datafusion): Add DDL support with PRIMARY KEY constraint syntax (#237)
- Add PaimonDdlHandler for CREATE TABLE, ALTER TABLE (ADD/DROP/RENAME
COLUMN, RENAME TABLE)
- Add PaimonTableFactory for CREATE EXTERNAL TABLE via DataFusion
TableProviderFactory
- Extend PaimonCatalogProvider/SchemaProvider with CREATE/DROP SCHEMA and
DROP TABLE
- Add arrow_to_paimon_type and arrow_fields_to_paimon to paimon::arrow for
Arrow-to-Paimon type conversion
- Support PRIMARY KEY (col, ...) constraint syntax in CREATE TABLE DDL
---
crates/integrations/datafusion/src/catalog.rs | 77 ++
crates/integrations/datafusion/src/ddl.rs | 1234 +++++++++++++++++++++
crates/integrations/datafusion/src/lib.rs | 2 +
crates/integrations/datafusion/tests/ddl_tests.rs | 497 +++++++++
crates/paimon/src/arrow/mod.rs | 324 +++++-
crates/paimon/src/spec/schema.rs | 68 +-
6 files changed, 2186 insertions(+), 16 deletions(-)
diff --git a/crates/integrations/datafusion/src/catalog.rs
b/crates/integrations/datafusion/src/catalog.rs
index 626a47f..7ac66d4 100644
--- a/crates/integrations/datafusion/src/catalog.rs
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -18,6 +18,7 @@
//! Paimon catalog integration for DataFusion.
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -86,6 +87,50 @@ impl CatalogProvider for PaimonCatalogProvider {
"paimon catalog access thread panicked",
)
}
+
+ fn register_schema(
+ &self,
+ name: &str,
+ _schema: Arc<dyn SchemaProvider>,
+ ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
+ let catalog = Arc::clone(&self.catalog);
+ let name = name.to_string();
+ block_on_with_runtime(
+ async move {
+ catalog
+ .create_database(&name, false, HashMap::new())
+ .await
+ .map_err(to_datafusion_error)?;
+ Ok(Some(
+ Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog),
name))
+ as Arc<dyn SchemaProvider>,
+ ))
+ },
+ "paimon catalog access thread panicked",
+ )
+ }
+
+ fn deregister_schema(
+ &self,
+ name: &str,
+ cascade: bool,
+ ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
+ let catalog = Arc::clone(&self.catalog);
+ let name = name.to_string();
+ block_on_with_runtime(
+ async move {
+ catalog
+ .drop_database(&name, false, cascade)
+ .await
+ .map_err(to_datafusion_error)?;
+ Ok(Some(
+ Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog),
name))
+ as Arc<dyn SchemaProvider>,
+ ))
+ },
+ "paimon catalog access thread panicked",
+ )
+ }
}
/// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing
@@ -159,4 +204,36 @@ impl SchemaProvider for PaimonSchemaProvider {
"paimon catalog access thread panicked",
)
}
+
+ fn register_table(
+ &self,
+ _name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ // DataFusion calls register_table after table creation, so we just
+ // acknowledge it here.
+ Ok(Some(table))
+ }
+
+ fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn
TableProvider>>> {
+ let catalog = Arc::clone(&self.catalog);
+ let identifier = Identifier::new(self.database.clone(), name);
+ block_on_with_runtime(
+ async move {
+ // Try to get the table first so we can return it.
+ let table = match catalog.get_table(&identifier).await {
+ Ok(t) => t,
+ Err(paimon::Error::TableNotExist { .. }) => return
Ok(None),
+ Err(e) => return Err(to_datafusion_error(e)),
+ };
+ let provider = PaimonTableProvider::try_new(table)?;
+ catalog
+ .drop_table(&identifier, false)
+ .await
+ .map_err(to_datafusion_error)?;
+ Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
+ },
+ "paimon catalog access thread panicked",
+ )
+ }
}
diff --git a/crates/integrations/datafusion/src/ddl.rs
b/crates/integrations/datafusion/src/ddl.rs
new file mode 100644
index 0000000..97eb900
--- /dev/null
+++ b/crates/integrations/datafusion/src/ddl.rs
@@ -0,0 +1,1234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DDL support for Paimon tables.
+//!
+//! DataFusion does not natively support all DDL statements needed by Paimon.
+//! This module provides [`PaimonDdlHandler`] which intercepts CREATE TABLE and
+//! ALTER TABLE SQL, translates them to Paimon catalog operations, and
delegates
+//! everything else (SELECT, CREATE/DROP SCHEMA, DROP TABLE, etc.) to the
+//! underlying [`SessionContext`].
+//!
+//! Supported DDL:
+//! - `CREATE TABLE db.t (col TYPE, ..., PRIMARY KEY (col, ...)) [PARTITIONED
BY (col TYPE, ...)] [WITH ('key' = 'val')]`
+//! - `ALTER TABLE db.t ADD COLUMN col TYPE`
+//! - `ALTER TABLE db.t DROP COLUMN col`
+//! - `ALTER TABLE db.t RENAME COLUMN old TO new`
+//! - `ALTER TABLE db.t RENAME TO new_name`
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::StringArray;
+use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion::sql::sqlparser::ast::{
+ AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions,
HiveDistributionStyle,
+ ObjectName, RenameTableNameKind, SqlOption, Statement,
+};
+use datafusion::sql::sqlparser::dialect::GenericDialect;
+use datafusion::sql::sqlparser::parser::Parser;
+use paimon::catalog::{Catalog, Identifier};
+use paimon::spec::SchemaChange;
+
+use crate::error::to_datafusion_error;
+use paimon::arrow::arrow_to_paimon_type;
+
+/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL
statements
+/// that DataFusion does not natively support (e.g. ALTER TABLE).
+///
+/// For all other SQL, it delegates to the inner `SessionContext`.
+///
+/// # Example
+/// ```ignore
+/// let handler = PaimonDdlHandler::new(ctx, catalog);
+/// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?;
+/// ```
+pub struct PaimonDdlHandler {
+ ctx: SessionContext,
+ catalog: Arc<dyn Catalog>,
+ /// The catalog name registered in the SessionContext (used to strip the
catalog prefix).
+ catalog_name: String,
+}
+
+impl PaimonDdlHandler {
+ pub fn new(
+ ctx: SessionContext,
+ catalog: Arc<dyn Catalog>,
+ catalog_name: impl Into<String>,
+ ) -> Self {
+ Self {
+ ctx,
+ catalog,
+ catalog_name: catalog_name.into(),
+ }
+ }
+
+ /// Returns a reference to the inner [`SessionContext`].
+ pub fn ctx(&self) -> &SessionContext {
+ &self.ctx
+ }
+
+ /// Execute a SQL statement. ALTER TABLE is handled by Paimon directly;
+ /// everything else is delegated to DataFusion.
+ pub async fn sql(&self, sql: &str) -> DFResult<DataFrame> {
+ let dialect = GenericDialect {};
+ let statements = Parser::parse_sql(&dialect, sql)
+ .map_err(|e| DataFusionError::Plan(format!("SQL parse error:
{e}")))?;
+
+ if statements.len() != 1 {
+ return Err(DataFusionError::Plan(
+ "Expected exactly one SQL statement".to_string(),
+ ));
+ }
+
+ match &statements[0] {
+ Statement::CreateTable(create_table) =>
self.handle_create_table(create_table).await,
+ Statement::AlterTable {
+ name,
+ operations,
+ if_exists,
+ ..
+ } => self.handle_alter_table(name, operations, *if_exists).await,
+ _ => self.ctx.sql(sql).await,
+ }
+ }
+
+ async fn handle_create_table(&self, ct: &CreateTable) ->
DFResult<DataFrame> {
+ if ct.external {
+ return Err(DataFusionError::Plan(
+ "CREATE EXTERNAL TABLE is not supported. Use CREATE TABLE
instead.".to_string(),
+ ));
+ }
+ if ct.location.is_some() {
+ return Err(DataFusionError::Plan(
+ "LOCATION is not supported for Paimon tables. Table path is
determined by the catalog warehouse.".to_string(),
+ ));
+ }
+ if ct.query.is_some() {
+ return Err(DataFusionError::Plan(
+ "CREATE TABLE AS SELECT is not yet supported for Paimon
tables.".to_string(),
+ ));
+ }
+
+ let identifier = self.resolve_table_name(&ct.name)?;
+
+ let mut builder = paimon::spec::Schema::builder();
+
+ // Columns
+ for col in &ct.columns {
+ let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
+ let nullable = !col.options.iter().any(|opt| {
+ matches!(
+ opt.option,
+ datafusion::sql::sqlparser::ast::ColumnOption::NotNull
+ )
+ });
+ let paimon_type =
+ arrow_to_paimon_type(&arrow_type,
nullable).map_err(to_datafusion_error)?;
+ builder = builder.column(col.name.value.clone(), paimon_type);
+ }
+
+ // Primary key from constraints: PRIMARY KEY (col, ...)
+ for constraint in &ct.constraints {
+ if let
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey {
+ columns, ..
+ } = constraint
+ {
+ let pk_cols: Vec<String> =
+ columns.iter().map(|c|
c.column.expr.to_string()).collect();
+ builder = builder.primary_key(pk_cols);
+ }
+ }
+
+ // Partition keys from PARTITIONED BY (col, ...)
+ if let HiveDistributionStyle::PARTITIONED { columns } =
&ct.hive_distribution {
+ let partition_keys: Vec<String> =
+ columns.iter().map(|c| c.name.value.clone()).collect();
+ builder = builder.partition_keys(partition_keys);
+ }
+
+ // Table options from WITH ('key' = 'value', ...)
+ for (k, v) in extract_options(&ct.table_options)? {
+ builder = builder.option(k, v);
+ }
+
+ let schema = builder.build().map_err(to_datafusion_error)?;
+
+ self.catalog
+ .create_table(&identifier, schema, ct.if_not_exists)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ ok_result(&self.ctx)
+ }
+
+ async fn handle_alter_table(
+ &self,
+ name: &ObjectName,
+ operations: &[AlterTableOperation],
+ if_exists: bool,
+ ) -> DFResult<DataFrame> {
+ let identifier = self.resolve_table_name(name)?;
+
+ let mut changes = Vec::new();
+ let mut rename_to: Option<Identifier> = None;
+
+ for op in operations {
+ match op {
+ AlterTableOperation::AddColumn { column_def, .. } => {
+ let change = column_def_to_add_column(column_def)?;
+ changes.push(change);
+ }
+ AlterTableOperation::DropColumn {
+ column_names,
+ if_exists: _,
+ ..
+ } => {
+ for col in column_names {
+
changes.push(SchemaChange::drop_column(col.value.clone()));
+ }
+ }
+ AlterTableOperation::RenameColumn {
+ old_column_name,
+ new_column_name,
+ } => {
+ changes.push(SchemaChange::rename_column(
+ old_column_name.value.clone(),
+ new_column_name.value.clone(),
+ ));
+ }
+ AlterTableOperation::RenameTable { table_name } => {
+ let new_name = match table_name {
+ RenameTableNameKind::To(name) |
RenameTableNameKind::As(name) => {
+ object_name_to_string(name)
+ }
+ };
+ rename_to =
Some(Identifier::new(identifier.database().to_string(), new_name));
+ }
+ other => {
+ return Err(DataFusionError::Plan(format!(
+ "Unsupported ALTER TABLE operation: {other}"
+ )));
+ }
+ }
+ }
+
+ if let Some(new_identifier) = rename_to {
+ self.catalog
+ .rename_table(&identifier, &new_identifier, if_exists)
+ .await
+ .map_err(to_datafusion_error)?;
+ }
+
+ if !changes.is_empty() {
+ self.catalog
+ .alter_table(&identifier, changes, if_exists)
+ .await
+ .map_err(to_datafusion_error)?;
+ }
+
+ ok_result(&self.ctx)
+ }
+
+ /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon
Identifier.
+ fn resolve_table_name(&self, name: &ObjectName) -> DFResult<Identifier> {
+ let parts: Vec<String> = name
+ .0
+ .iter()
+ .filter_map(|p| p.as_ident().map(|id| id.value.clone()))
+ .collect();
+ match parts.len() {
+ 3 => {
+ // catalog.database.table — strip catalog prefix
+ if parts[0] != self.catalog_name {
+ return Err(DataFusionError::Plan(format!(
+ "Unknown catalog '{}', expected '{}'",
+ parts[0], self.catalog_name
+ )));
+ }
+ Ok(Identifier::new(parts[1].clone(), parts[2].clone()))
+ }
+ 2 => Ok(Identifier::new(parts[0].clone(), parts[1].clone())),
+ 1 => Err(DataFusionError::Plan(format!(
+ "ALTER TABLE requires at least database.table, got: {}",
+ parts[0]
+ ))),
+ _ => Err(DataFusionError::Plan(format!(
+ "Invalid table reference: {name}"
+ ))),
+ }
+ }
+}
+
+/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`].
+fn column_def_to_add_column(col: &ColumnDef) -> DFResult<SchemaChange> {
+ let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
+ let nullable = !col.options.iter().any(|opt| {
+ matches!(
+ opt.option,
+ datafusion::sql::sqlparser::ast::ColumnOption::NotNull
+ )
+ });
+ let paimon_type = arrow_to_paimon_type(&arrow_type,
nullable).map_err(to_datafusion_error)?;
+ Ok(SchemaChange::add_column(
+ col.name.value.clone(),
+ paimon_type,
+ ))
+}
+
+/// Convert a sqlparser SQL data type to an Arrow data type.
+fn sql_data_type_to_arrow(
+ sql_type: &datafusion::sql::sqlparser::ast::DataType,
+) -> DFResult<ArrowDataType> {
+ use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
+ match sql_type {
+ SqlType::Boolean => Ok(ArrowDataType::Boolean),
+ SqlType::TinyInt(_) => Ok(ArrowDataType::Int8),
+ SqlType::SmallInt(_) => Ok(ArrowDataType::Int16),
+ SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32),
+ SqlType::BigInt(_) => Ok(ArrowDataType::Int64),
+ SqlType::Float(_) => Ok(ArrowDataType::Float32),
+ SqlType::Real => Ok(ArrowDataType::Float32),
+ SqlType::Double(_) | SqlType::DoublePrecision =>
Ok(ArrowDataType::Float64),
+ SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text |
SqlType::String(_) => {
+ Ok(ArrowDataType::Utf8)
+ }
+ SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8),
+ SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) |
SqlType::Bytea => {
+ Ok(ArrowDataType::Binary)
+ }
+ SqlType::Date => Ok(ArrowDataType::Date32),
+ SqlType::Timestamp(precision, tz_info) => {
+ use datafusion::sql::sqlparser::ast::TimezoneInfo;
+ let unit = match precision {
+ Some(0) => datafusion::arrow::datatypes::TimeUnit::Second,
+ Some(1..=3) | None =>
datafusion::arrow::datatypes::TimeUnit::Millisecond,
+ Some(4..=6) =>
datafusion::arrow::datatypes::TimeUnit::Microsecond,
+ _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond,
+ };
+ let tz = match tz_info {
+ TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None,
+ _ => Some("UTC".into()),
+ };
+ Ok(ArrowDataType::Timestamp(unit, tz))
+ }
+ SqlType::Decimal(info) => {
+ use datafusion::sql::sqlparser::ast::ExactNumberInfo;
+ let (p, s) = match info {
+ ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as
i8),
+ ExactNumberInfo::Precision(p) => (*p as u8, 0),
+ ExactNumberInfo::None => (10, 0),
+ };
+ Ok(ArrowDataType::Decimal128(p, s))
+ }
+ SqlType::Array(elem_def) => {
+ let elem_type = match elem_def {
+ ArrayElemTypeDef::AngleBracket(t)
+ | ArrayElemTypeDef::SquareBracket(t, _)
+ | ArrayElemTypeDef::Parenthesis(t) =>
sql_data_type_to_arrow(t)?,
+ ArrayElemTypeDef::None => {
+ return Err(DataFusionError::Plan(
+ "ARRAY type requires an element type".to_string(),
+ ));
+ }
+ };
+ Ok(ArrowDataType::List(Arc::new(Field::new(
+ "element", elem_type, true,
+ ))))
+ }
+ SqlType::Map(key_type, value_type) => {
+ let key = sql_data_type_to_arrow(key_type)?;
+ let value = sql_data_type_to_arrow(value_type)?;
+ let entries = Field::new(
+ "entries",
+ ArrowDataType::Struct(
+ vec![
+ Field::new("key", key, false),
+ Field::new("value", value, true),
+ ]
+ .into(),
+ ),
+ false,
+ );
+ Ok(ArrowDataType::Map(Arc::new(entries), false))
+ }
+ SqlType::Struct(fields, _) => {
+ let arrow_fields: Vec<Field> = fields
+ .iter()
+ .map(|f| {
+ let name = f
+ .field_name
+ .as_ref()
+ .map(|n| n.value.clone())
+ .unwrap_or_default();
+ let dt = sql_data_type_to_arrow(&f.field_type)?;
+ Ok(Field::new(name, dt, true))
+ })
+ .collect::<DFResult<_>>()?;
+ Ok(ArrowDataType::Struct(arrow_fields.into()))
+ }
+ _ => Err(DataFusionError::Plan(format!(
+ "Unsupported SQL data type: {sql_type}"
+ ))),
+ }
+}
+
+fn object_name_to_string(name: &ObjectName) -> String {
+ name.0
+ .iter()
+ .filter_map(|p| p.as_ident().map(|id| id.value.clone()))
+ .collect::<Vec<_>>()
+ .join(".")
+}
+
+/// Extract key-value pairs from [`CreateTableOptions`].
+fn extract_options(opts: &CreateTableOptions) -> DFResult<Vec<(String,
String)>> {
+ let sql_options = match opts {
+ CreateTableOptions::With(options)
+ | CreateTableOptions::Options(options)
+ | CreateTableOptions::TableProperties(options)
+ | CreateTableOptions::Plain(options) => options,
+ CreateTableOptions::None => return Ok(Vec::new()),
+ };
+ sql_options
+ .iter()
+ .map(|opt| match opt {
+ SqlOption::KeyValue { key, value } => {
+ let v = value.to_string();
+ // Strip surrounding quotes from the value if present.
+ let v = v
+ .strip_prefix('\'')
+ .and_then(|s| s.strip_suffix('\''))
+ .unwrap_or(&v)
+ .to_string();
+ Ok((key.value.clone(), v))
+ }
+ other => Err(DataFusionError::Plan(format!(
+ "Unsupported table option: {other}"
+ ))),
+ })
+ .collect()
+}
+
+/// Return an empty DataFrame with a single "result" column containing "OK".
+fn ok_result(ctx: &SessionContext) -> DFResult<DataFrame> {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "result",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(StringArray::from(vec!["OK"]))],
+ )?;
+ let df = ctx.read_batch(batch)?;
+ Ok(df)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+ use std::sync::Mutex;
+
+ use async_trait::async_trait;
+ use datafusion::arrow::datatypes::TimeUnit;
+ use paimon::catalog::Database;
+ use paimon::spec::Schema as PaimonSchema;
+ use paimon::table::Table;
+
+ // ==================== Mock Catalog ====================
+
+ #[allow(clippy::enum_variant_names)]
+ #[derive(Debug)]
+ enum CatalogCall {
+ CreateTable {
+ identifier: Identifier,
+ schema: PaimonSchema,
+ ignore_if_exists: bool,
+ },
+ AlterTable {
+ identifier: Identifier,
+ changes: Vec<SchemaChange>,
+ ignore_if_not_exists: bool,
+ },
+ RenameTable {
+ from: Identifier,
+ to: Identifier,
+ ignore_if_not_exists: bool,
+ },
+ }
+
+ struct MockCatalog {
+ calls: Mutex<Vec<CatalogCall>>,
+ }
+
+ impl MockCatalog {
+ fn new() -> Self {
+ Self {
+ calls: Mutex::new(Vec::new()),
+ }
+ }
+
+ fn take_calls(&self) -> Vec<CatalogCall> {
+ std::mem::take(&mut *self.calls.lock().unwrap())
+ }
+ }
+
+ #[async_trait]
+ impl Catalog for MockCatalog {
+ async fn list_databases(&self) -> paimon::Result<Vec<String>> {
+ Ok(vec![])
+ }
+ async fn create_database(
+ &self,
+ _name: &str,
+ _ignore_if_exists: bool,
+ _properties: HashMap<String, String>,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn get_database(&self, _name: &str) -> paimon::Result<Database> {
+ unimplemented!()
+ }
+ async fn drop_database(
+ &self,
+ _name: &str,
+ _ignore_if_not_exists: bool,
+ _cascade: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn get_table(&self, _identifier: &Identifier) ->
paimon::Result<Table> {
+ unimplemented!()
+ }
+ async fn list_tables(&self, _database_name: &str) ->
paimon::Result<Vec<String>> {
+ Ok(vec![])
+ }
+ async fn create_table(
+ &self,
+ identifier: &Identifier,
+ creation: PaimonSchema,
+ ignore_if_exists: bool,
+ ) -> paimon::Result<()> {
+ self.calls.lock().unwrap().push(CatalogCall::CreateTable {
+ identifier: identifier.clone(),
+ schema: creation,
+ ignore_if_exists,
+ });
+ Ok(())
+ }
+ async fn drop_table(
+ &self,
+ _identifier: &Identifier,
+ _ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn rename_table(
+ &self,
+ from: &Identifier,
+ to: &Identifier,
+ ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ self.calls.lock().unwrap().push(CatalogCall::RenameTable {
+ from: from.clone(),
+ to: to.clone(),
+ ignore_if_not_exists,
+ });
+ Ok(())
+ }
+ async fn alter_table(
+ &self,
+ identifier: &Identifier,
+ changes: Vec<SchemaChange>,
+ ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ self.calls.lock().unwrap().push(CatalogCall::AlterTable {
+ identifier: identifier.clone(),
+ changes,
+ ignore_if_not_exists,
+ });
+ Ok(())
+ }
+ }
+
+ fn make_handler(catalog: Arc<MockCatalog>) -> PaimonDdlHandler {
+ PaimonDdlHandler::new(SessionContext::new(), catalog, "paimon")
+ }
+
+ // ==================== sql_data_type_to_arrow tests ====================
+
+ #[test]
+ fn test_sql_type_boolean() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Boolean).unwrap(),
+ ArrowDataType::Boolean
+ );
+ }
+
+ #[test]
+ fn test_sql_type_integers() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::TinyInt(None)).unwrap(),
+ ArrowDataType::Int8
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::SmallInt(None)).unwrap(),
+ ArrowDataType::Int16
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Int(None)).unwrap(),
+ ArrowDataType::Int32
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Integer(None)).unwrap(),
+ ArrowDataType::Int32
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::BigInt(None)).unwrap(),
+ ArrowDataType::Int64
+ );
+ }
+
+ #[test]
+ fn test_sql_type_floats() {
+ use datafusion::sql::sqlparser::ast::{DataType as SqlType,
ExactNumberInfo};
+ assert_eq!(
+
sql_data_type_to_arrow(&SqlType::Float(ExactNumberInfo::None)).unwrap(),
+ ArrowDataType::Float32
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Real).unwrap(),
+ ArrowDataType::Float32
+ );
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::DoublePrecision).unwrap(),
+ ArrowDataType::Float64
+ );
+ }
+
+ #[test]
+ fn test_sql_type_string_variants() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ for sql_type in [SqlType::Varchar(None), SqlType::Text,
SqlType::String(None)] {
+ assert_eq!(
+ sql_data_type_to_arrow(&sql_type).unwrap(),
+ ArrowDataType::Utf8,
+ "failed for {sql_type:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_sql_type_binary() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Bytea).unwrap(),
+ ArrowDataType::Binary
+ );
+ }
+
+ #[test]
+ fn test_sql_type_date() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Date).unwrap(),
+ ArrowDataType::Date32
+ );
+ }
+
+ #[test]
+ fn test_sql_type_timestamp_default() {
+ use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
+ let result = sql_data_type_to_arrow(&SqlType::Timestamp(None,
TimezoneInfo::None)).unwrap();
+ assert_eq!(
+ result,
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+ );
+ }
+
+ #[test]
+ fn test_sql_type_timestamp_with_precision() {
+ use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
+ // precision 0 => Second
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Timestamp(Some(0),
TimezoneInfo::None)).unwrap(),
+ ArrowDataType::Timestamp(TimeUnit::Second, None)
+ );
+ // precision 3 => Millisecond
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Timestamp(Some(3),
TimezoneInfo::None)).unwrap(),
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+ );
+ // precision 6 => Microsecond
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Timestamp(Some(6),
TimezoneInfo::None)).unwrap(),
+ ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
+ );
+ // precision 9 => Nanosecond
+ assert_eq!(
+ sql_data_type_to_arrow(&SqlType::Timestamp(Some(9),
TimezoneInfo::None)).unwrap(),
+ ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)
+ );
+ }
+
+ #[test]
+ fn test_sql_type_timestamp_with_tz() {
+ use datafusion::sql::sqlparser::ast::{DataType as SqlType,
TimezoneInfo};
+ let result =
+ sql_data_type_to_arrow(&SqlType::Timestamp(None,
TimezoneInfo::WithTimeZone)).unwrap();
+ assert_eq!(
+ result,
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()))
+ );
+ }
+
+ #[test]
+ fn test_sql_type_decimal() {
+ use datafusion::sql::sqlparser::ast::{DataType as SqlType,
ExactNumberInfo};
+ assert_eq!(
+
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18,
2)))
+ .unwrap(),
+ ArrowDataType::Decimal128(18, 2)
+ );
+ assert_eq!(
+
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::Precision(10))).unwrap(),
+ ArrowDataType::Decimal128(10, 0)
+ );
+ assert_eq!(
+
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::None)).unwrap(),
+ ArrowDataType::Decimal128(10, 0)
+ );
+ }
+
+ #[test]
+ fn test_sql_type_unsupported() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err());
+ }
+
+ #[test]
+ fn test_sql_type_array() {
+ use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
+ let result =
sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket(
+ Box::new(SqlType::Int(None)),
+ )))
+ .unwrap();
+ assert_eq!(
+ result,
+ ArrowDataType::List(Arc::new(Field::new("element",
ArrowDataType::Int32, true)))
+ );
+ }
+
+ #[test]
+ fn test_sql_type_array_no_element() {
+ use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as
SqlType};
+
assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err());
+ }
+
+ #[test]
+ fn test_sql_type_map() {
+ use datafusion::sql::sqlparser::ast::DataType as SqlType;
+ let result = sql_data_type_to_arrow(&SqlType::Map(
+ Box::new(SqlType::Varchar(None)),
+ Box::new(SqlType::Int(None)),
+ ))
+ .unwrap();
+ let expected = ArrowDataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ ArrowDataType::Struct(
+ vec![
+ Field::new("key", ArrowDataType::Utf8, false),
+ Field::new("value", ArrowDataType::Int32, true),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ false,
+ );
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_sql_type_struct() {
+ use datafusion::sql::sqlparser::ast::{
+ DataType as SqlType, Ident, StructBracketKind, StructField,
+ };
+ let result = sql_data_type_to_arrow(&SqlType::Struct(
+ vec![
+ StructField {
+ field_name: Some(Ident::new("name")),
+ field_type: SqlType::Varchar(None),
+ options: None,
+ },
+ StructField {
+ field_name: Some(Ident::new("age")),
+ field_type: SqlType::Int(None),
+ options: None,
+ },
+ ],
+ StructBracketKind::AngleBrackets,
+ ))
+ .unwrap();
+ assert_eq!(
+ result,
+ ArrowDataType::Struct(
+ vec![
+ Field::new("name", ArrowDataType::Utf8, true),
+ Field::new("age", ArrowDataType::Int32, true),
+ ]
+ .into()
+ )
+ );
+ }
+
+ // ==================== resolve_table_name tests ====================
+
+ #[test]
+ fn test_resolve_three_part_name() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let dialect = GenericDialect {};
+ let stmts = Parser::parse_sql(&dialect, "SELECT * FROM
paimon.mydb.mytable").unwrap();
+ if let Statement::Query(q) = &stmts[0] {
+ if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) =
q.body.as_ref() {
+ if let datafusion::sql::sqlparser::ast::TableFactor::Table {
name, .. } =
+ &sel.from[0].relation
+ {
+ let id = handler.resolve_table_name(name).unwrap();
+ assert_eq!(id.database(), "mydb");
+ assert_eq!(id.object(), "mytable");
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_resolve_two_part_name() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let dialect = GenericDialect {};
+ let stmts = Parser::parse_sql(&dialect, "SELECT * FROM
mydb.mytable").unwrap();
+ if let Statement::Query(q) = &stmts[0] {
+ if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) =
q.body.as_ref() {
+ if let datafusion::sql::sqlparser::ast::TableFactor::Table {
name, .. } =
+ &sel.from[0].relation
+ {
+ let id = handler.resolve_table_name(name).unwrap();
+ assert_eq!(id.database(), "mydb");
+ assert_eq!(id.object(), "mytable");
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_resolve_wrong_catalog_name() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let dialect = GenericDialect {};
+ let stmts = Parser::parse_sql(&dialect, "SELECT * FROM
other.mydb.mytable").unwrap();
+ if let Statement::Query(q) = &stmts[0] {
+ if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) =
q.body.as_ref() {
+ if let datafusion::sql::sqlparser::ast::TableFactor::Table {
name, .. } =
+ &sel.from[0].relation
+ {
+ let err = handler.resolve_table_name(name).unwrap_err();
+ assert!(err.to_string().contains("Unknown catalog"));
+ }
+ }
+ }
+ }
+
+ #[test]
+ fn test_resolve_single_part_name_error() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let dialect = GenericDialect {};
+ let stmts = Parser::parse_sql(&dialect, "SELECT * FROM
mytable").unwrap();
+ if let Statement::Query(q) = &stmts[0] {
+ if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) =
q.body.as_ref() {
+ if let datafusion::sql::sqlparser::ast::TableFactor::Table {
name, .. } =
+ &sel.from[0].relation
+ {
+ let err = handler.resolve_table_name(name).unwrap_err();
+ assert!(err.to_string().contains("at least
database.table"));
+ }
+ }
+ }
+ }
+
+ // ==================== extract_options tests ====================
+
+ #[test]
+ fn test_extract_options_none() {
+ let opts = extract_options(&CreateTableOptions::None).unwrap();
+ assert!(opts.is_empty());
+ }
+
+ #[test]
+ fn test_extract_options_with_kv() {
+ // Parse a CREATE TABLE with WITH options to get a real
CreateTableOptions
+ let dialect = GenericDialect {};
+ let stmts =
+ Parser::parse_sql(&dialect, "CREATE TABLE t (id INT) WITH
('bucket' = '4')").unwrap();
+ if let Statement::CreateTable(ct) = &stmts[0] {
+ let opts = extract_options(&ct.table_options).unwrap();
+ assert_eq!(opts.len(), 1);
+ assert_eq!(opts[0].0, "bucket");
+ assert_eq!(opts[0].1, "4");
+ } else {
+ panic!("expected CreateTable");
+ }
+ }
+
+ // ==================== PaimonDdlHandler::sql integration tests
====================
+
+ #[tokio::test]
+ async fn test_create_table_basic() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("CREATE TABLE mydb.t1 (id INT NOT NULL, name VARCHAR, PRIMARY
KEY (id))")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::CreateTable {
+ identifier,
+ schema,
+ ignore_if_exists,
+ } = &calls[0]
+ {
+ assert_eq!(identifier.database(), "mydb");
+ assert_eq!(identifier.object(), "t1");
+ assert!(!ignore_if_exists);
+ assert_eq!(schema.primary_keys(), &["id"]);
+ } else {
+ panic!("expected CreateTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_create_table_if_not_exists() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("CREATE TABLE IF NOT EXISTS mydb.t1 (id INT)")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::CreateTable {
+ ignore_if_exists, ..
+ } = &calls[0]
+ {
+ assert!(ignore_if_exists);
+ } else {
+ panic!("expected CreateTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_create_table_with_options() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("CREATE TABLE mydb.t1 (id INT) WITH ('bucket' = '4',
'file.format' = 'parquet')")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::CreateTable { schema, .. } = &calls[0] {
+ let opts = schema.options();
+ assert_eq!(opts.get("bucket").unwrap(), "4");
+ assert_eq!(opts.get("file.format").unwrap(), "parquet");
+ } else {
+ panic!("expected CreateTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_create_table_three_part_name() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("CREATE TABLE paimon.mydb.t1 (id INT)")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ if let CatalogCall::CreateTable { identifier, .. } = &calls[0] {
+ assert_eq!(identifier.database(), "mydb");
+ assert_eq!(identifier.object(), "t1");
+ } else {
+ panic!("expected CreateTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_add_column() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::AlterTable {
+ identifier,
+ changes,
+ ..
+ } = &calls[0]
+ {
+ assert_eq!(identifier.database(), "mydb");
+ assert_eq!(identifier.object(), "t1");
+ assert_eq!(changes.len(), 1);
+ assert!(
+ matches!(&changes[0], SchemaChange::AddColumn { field_name, ..
} if field_name == "age")
+ );
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_drop_column() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 DROP COLUMN age")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
+ assert_eq!(changes.len(), 1);
+ assert!(
+ matches!(&changes[0], SchemaChange::DropColumn { field_name }
if field_name == "age")
+ );
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_rename_column() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 RENAME COLUMN old_name TO new_name")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
+ assert_eq!(changes.len(), 1);
+ assert!(matches!(
+ &changes[0],
+ SchemaChange::RenameColumn { field_name, new_name }
+ if field_name == "old_name" && new_name == "new_name"
+ ));
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_rename_table() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 RENAME TO t2")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::RenameTable { from, to, .. } = &calls[0] {
+ assert_eq!(from.database(), "mydb");
+ assert_eq!(from.object(), "t1");
+ assert_eq!(to.database(), "mydb");
+ assert_eq!(to.object(), "t2");
+ } else {
+ panic!("expected RenameTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_if_exists_add_column() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE IF EXISTS mydb.t1 ADD COLUMN age INT")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::AlterTable {
+ ignore_if_not_exists,
+ ..
+ } = &calls[0]
+ {
+ assert!(ignore_if_not_exists);
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_without_if_exists() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ if let CatalogCall::AlterTable {
+ ignore_if_not_exists,
+ ..
+ } = &calls[0]
+ {
+ assert!(!ignore_if_not_exists);
+ } else {
+ panic!("expected AlterTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_if_exists_rename() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE IF EXISTS mydb.t1 RENAME TO t2")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::RenameTable {
+ from,
+ to,
+ ignore_if_not_exists,
+ } = &calls[0]
+ {
+ assert!(ignore_if_not_exists);
+ assert_eq!(from.object(), "t1");
+ assert_eq!(to.object(), "t2");
+ } else {
+ panic!("expected RenameTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_alter_table_rename_three_part_name() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+
+ handler
+ .sql("ALTER TABLE paimon.mydb.t1 RENAME TO t2")
+ .await
+ .unwrap();
+
+ let calls = catalog.take_calls();
+ assert_eq!(calls.len(), 1);
+ if let CatalogCall::RenameTable { from, to, .. } = &calls[0] {
+ assert_eq!(from.database(), "mydb");
+ assert_eq!(from.object(), "t1");
+ assert_eq!(to.database(), "mydb");
+ assert_eq!(to.object(), "t2");
+ } else {
+ panic!("expected RenameTable call");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_sql_parse_error() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let result = handler.sql("NOT VALID SQL !!!").await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains("SQL parse error"));
+ }
+
+ #[tokio::test]
+ async fn test_multiple_statements_error() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let result = handler.sql("SELECT 1; SELECT 2").await;
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("exactly one SQL statement"));
+ }
+
+ #[tokio::test]
+ async fn test_create_external_table_rejected() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog);
+ let result = handler
+ .sql("CREATE EXTERNAL TABLE mydb.t1 (id INT) STORED AS PARQUET")
+ .await;
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("CREATE EXTERNAL TABLE is not supported"));
+ }
+
+ #[tokio::test]
+ async fn test_non_ddl_delegates_to_datafusion() {
+ let catalog = Arc::new(MockCatalog::new());
+ let handler = make_handler(catalog.clone());
+ // SELECT should be delegated to DataFusion, not intercepted
+ let df = handler.sql("SELECT 1 AS x").await.unwrap();
+ let batches = df.collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 1);
+ // No catalog calls
+ assert!(catalog.take_calls().is_empty());
+ }
+}
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 4e9fdb3..e803153 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -37,6 +37,7 @@
//! translatable partition-only conjuncts from DataFusion filters.
mod catalog;
+mod ddl;
mod error;
mod filter_pushdown;
#[cfg(feature = "fulltext")]
@@ -47,6 +48,7 @@ pub mod runtime;
mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
+pub use ddl::PaimonDdlHandler;
pub use error::to_datafusion_error;
#[cfg(feature = "fulltext")]
pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs
b/crates/integrations/datafusion/tests/ddl_tests.rs
new file mode 100644
index 0000000..c520f18
--- /dev/null
+++ b/crates/integrations/datafusion/tests/ddl_tests.rs
@@ -0,0 +1,497 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DDL integration tests for paimon-datafusion.
+
+use std::sync::Arc;
+
+use datafusion::catalog::CatalogProvider;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
+use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler,
PaimonRelationPlanner};
+use tempfile::TempDir;
+
+fn create_test_env() -> (TempDir, Arc<FileSystemCatalog>) {
+ let temp_dir = TempDir::new().expect("Failed to create temp dir");
+ let warehouse = format!("file://{}", temp_dir.path().display());
+ let mut options = Options::new();
+ options.set(CatalogOptions::WAREHOUSE, warehouse);
+ let catalog = FileSystemCatalog::new(options).expect("Failed to create
catalog");
+ (temp_dir, Arc::new(catalog))
+}
+
+fn create_handler(catalog: Arc<FileSystemCatalog>) -> PaimonDdlHandler {
+ let ctx = SessionContext::new();
+ ctx.register_catalog(
+ "paimon",
+ Arc::new(PaimonCatalogProvider::new(catalog.clone())),
+ );
+ ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+ .expect("Failed to register relation planner");
+ PaimonDdlHandler::new(ctx, catalog, "paimon")
+}
+
+// ======================= CREATE / DROP SCHEMA =======================
+
+#[tokio::test]
+async fn test_create_schema() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ handler
+ .sql("CREATE SCHEMA paimon.test_db")
+ .await
+ .expect("CREATE SCHEMA should succeed");
+
+ let databases = catalog.list_databases().await.unwrap();
+ assert!(
+ databases.contains(&"test_db".to_string()),
+ "Database test_db should exist after CREATE SCHEMA"
+ );
+}
+
+#[tokio::test]
+async fn test_drop_schema() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("drop_me", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql("DROP SCHEMA paimon.drop_me CASCADE")
+ .await
+ .expect("DROP SCHEMA should succeed");
+
+ let databases = catalog.list_databases().await.unwrap();
+ assert!(
+ !databases.contains(&"drop_me".to_string()),
+ "Database drop_me should not exist after DROP SCHEMA"
+ );
+}
+
+#[tokio::test]
+async fn test_schema_names_via_catalog_provider() {
+ let (_tmp, catalog) = create_test_env();
+ let provider = PaimonCatalogProvider::new(catalog.clone());
+
+ catalog
+ .create_database("db_a", false, Default::default())
+ .await
+ .unwrap();
+ catalog
+ .create_database("db_b", false, Default::default())
+ .await
+ .unwrap();
+
+ let names = provider.schema_names();
+ assert!(names.contains(&"db_a".to_string()));
+ assert!(names.contains(&"db_b".to_string()));
+}
+
+// ======================= CREATE TABLE =======================
+
+#[tokio::test]
+async fn test_create_table() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.mydb.users (
+ id INT NOT NULL,
+ name STRING,
+ age INT,
+ PRIMARY KEY (id)
+ )",
+ )
+ .await
+ .expect("CREATE TABLE should succeed");
+
+ let tables = catalog.list_tables("mydb").await.unwrap();
+ assert!(
+ tables.contains(&"users".to_string()),
+ "Table users should exist after CREATE TABLE"
+ );
+
+ // Verify schema
+ let table = catalog
+ .get_table(&Identifier::new("mydb", "users"))
+ .await
+ .unwrap();
+ let schema = table.schema();
+ assert_eq!(schema.fields().len(), 3);
+ assert_eq!(schema.primary_keys(), &["id"]);
+}
+
+#[tokio::test]
+async fn test_create_table_with_partition() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.mydb.events (
+ id INT NOT NULL,
+ name STRING,
+ dt STRING,
+ PRIMARY KEY (id, dt)
+ ) PARTITIONED BY (dt STRING)
+ WITH ('bucket' = '2')",
+ )
+ .await
+ .expect("CREATE TABLE with partition should succeed");
+
+ let table = catalog
+ .get_table(&Identifier::new("mydb", "events"))
+ .await
+ .unwrap();
+ let schema = table.schema();
+ assert_eq!(schema.partition_keys(), &["dt"]);
+ assert_eq!(schema.primary_keys(), &["id", "dt"]);
+ assert_eq!(
+ schema.options().get("bucket"),
+ Some(&"2".to_string()),
+ "Table option 'bucket' should be preserved"
+ );
+}
+
+#[tokio::test]
+async fn test_create_table_if_not_exists() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ let sql = "CREATE TABLE IF NOT EXISTS paimon.mydb.t1 (
+ id INT NOT NULL
+ )";
+
+ // First create should succeed
+ handler.sql(sql).await.expect("First CREATE should succeed");
+
+ // Second create with IF NOT EXISTS should also succeed
+ handler
+ .sql(sql)
+ .await
+ .expect("Second CREATE with IF NOT EXISTS should succeed");
+}
+
+#[tokio::test]
+async fn test_create_external_table_rejected() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ let result = handler
+ .sql(
+ "CREATE EXTERNAL TABLE paimon.mydb.bad (
+ id INT NOT NULL
+ ) STORED AS PARQUET
+ LOCATION '/some/path'",
+ )
+ .await;
+
+ assert!(result.is_err(), "CREATE EXTERNAL TABLE should be rejected");
+ let err_msg = result.unwrap_err().to_string();
+ assert!(
+ err_msg.contains("CREATE EXTERNAL TABLE is not supported"),
+ "Error should mention CREATE EXTERNAL TABLE is not supported, got:
{err_msg}"
+ );
+}
+
+// ======================= CREATE TABLE with complex types
=======================
+
+#[tokio::test]
+async fn test_create_table_with_array_and_map() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.mydb.complex_types (
+ id INT NOT NULL,
+ tags ARRAY<STRING>,
+ props MAP(STRING, INT),
+ PRIMARY KEY (id)
+ )",
+ )
+ .await
+ .expect("CREATE TABLE with ARRAY and MAP should succeed");
+
+ let table = catalog
+ .get_table(&Identifier::new("mydb", "complex_types"))
+ .await
+ .unwrap();
+ let schema = table.schema();
+ assert_eq!(schema.fields().len(), 3);
+ assert_eq!(schema.primary_keys(), &["id"]);
+
+ // Verify ARRAY<STRING> column
+ let tags_field = &schema.fields()[1];
+ assert_eq!(tags_field.name(), "tags");
+ assert_eq!(
+ *tags_field.data_type(),
+ DataType::Array(ArrayType::new(
+ DataType::VarChar(VarCharType::string_type())
+ ))
+ );
+
+ // Verify MAP(STRING, INT) column
+ let props_field = &schema.fields()[2];
+ assert_eq!(props_field.name(), "props");
+ assert_eq!(
+ *props_field.data_type(),
+ DataType::Map(MapType::new(
+ DataType::VarChar(VarCharType::string_type())
+ .copy_with_nullable(false)
+ .unwrap(),
+ DataType::Int(IntType::new()),
+ ))
+ );
+}
+
+#[tokio::test]
+async fn test_create_table_with_row_type() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ handler
+ .sql(
+ "CREATE TABLE paimon.mydb.row_table (
+ id INT NOT NULL,
+ address STRUCT<city STRING, zip INT>,
+ PRIMARY KEY (id)
+ )",
+ )
+ .await
+ .expect("CREATE TABLE with STRUCT should succeed");
+
+ let table = catalog
+ .get_table(&Identifier::new("mydb", "row_table"))
+ .await
+ .unwrap();
+ let schema = table.schema();
+ assert_eq!(schema.fields().len(), 2);
+
+ // Verify STRUCT<city STRING, zip INT> column
+ let address_field = &schema.fields()[1];
+ assert_eq!(address_field.name(), "address");
+ if let DataType::Row(row) = address_field.data_type() {
+ assert_eq!(row.fields().len(), 2);
+ assert_eq!(row.fields()[0].name(), "city");
+ assert!(matches!(row.fields()[0].data_type(), DataType::VarChar(_)));
+ assert_eq!(row.fields()[1].name(), "zip");
+ assert!(matches!(row.fields()[1].data_type(), DataType::Int(_)));
+ } else {
+ panic!("expected Row type for address column");
+ }
+}
+
+// ======================= DROP TABLE =======================
+
+#[tokio::test]
+async fn test_drop_table() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ // Create a table first
+ let schema = paimon::spec::Schema::builder()
+ .column(
+ "id",
+ paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+ )
+ .build()
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("mydb", "to_drop"), schema, false)
+ .await
+ .unwrap();
+
+ assert!(catalog
+ .list_tables("mydb")
+ .await
+ .unwrap()
+ .contains(&"to_drop".to_string()));
+
+ handler
+ .sql("DROP TABLE paimon.mydb.to_drop")
+ .await
+ .expect("DROP TABLE should succeed");
+
+ assert!(
+ !catalog
+ .list_tables("mydb")
+ .await
+ .unwrap()
+ .contains(&"to_drop".to_string()),
+ "Table should not exist after DROP TABLE"
+ );
+}
+
+// ======================= ALTER TABLE =======================
+
+#[tokio::test]
+async fn test_alter_table_add_column() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ let schema = paimon::spec::Schema::builder()
+ .column(
+ "id",
+ paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+ )
+ .column(
+ "name",
+
paimon::spec::DataType::VarChar(paimon::spec::VarCharType::string_type()),
+ )
+ .build()
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("mydb", "alter_test"), schema, false)
+ .await
+ .unwrap();
+
+ // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect
an error
+ let result = handler
+ .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT")
+ .await;
+
+ // FileSystemCatalog returns Unsupported for alter_table, which is expected
+ assert!(
+ result.is_err(),
+ "ALTER TABLE should fail because FileSystemCatalog does not implement
alter_table yet"
+ );
+ let err_msg = result.unwrap_err().to_string();
+ assert!(
+ err_msg.contains("not yet implemented") ||
err_msg.contains("Unsupported"),
+ "Error should indicate alter_table is not implemented, got: {err_msg}"
+ );
+}
+
+#[tokio::test]
+async fn test_alter_table_rename() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ let schema = paimon::spec::Schema::builder()
+ .column(
+ "id",
+ paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+ )
+ .build()
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("mydb", "old_name"), schema, false)
+ .await
+ .unwrap();
+
+ handler
+ .sql("ALTER TABLE mydb.old_name RENAME TO new_name")
+ .await
+ .expect("ALTER TABLE RENAME should succeed");
+
+ let tables = catalog.list_tables("mydb").await.unwrap();
+ assert!(
+ !tables.contains(&"old_name".to_string()),
+ "old_name should not exist after rename"
+ );
+ assert!(
+ tables.contains(&"new_name".to_string()),
+ "new_name should exist after rename"
+ );
+}
+
+#[tokio::test]
+async fn test_ddl_handler_delegates_select() {
+ let (_tmp, catalog) = create_test_env();
+ let handler = create_handler(catalog.clone());
+
+ catalog
+ .create_database("mydb", false, Default::default())
+ .await
+ .unwrap();
+
+ let schema = paimon::spec::Schema::builder()
+ .column(
+ "id",
+ paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+ )
+ .build()
+ .unwrap();
+ catalog
+ .create_table(&Identifier::new("mydb", "t1"), schema, false)
+ .await
+ .unwrap();
+
+ // SELECT should be delegated to DataFusion
+ let df = handler
+ .sql("SELECT * FROM paimon.mydb.t1")
+ .await
+ .expect("SELECT should be delegated to DataFusion");
+
+ let batches = df.collect().await.expect("SELECT should execute");
+ // Empty table, but should succeed
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 0, "Empty table should return 0 rows");
+}
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index f48b9da..e2f60fc 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -22,7 +22,11 @@ pub(crate) mod schema_evolution;
pub use crate::arrow::reader::ArrowReaderBuilder;
-use crate::spec::{DataField, DataType as PaimonDataType};
+use crate::spec::{
+ ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType,
DateType,
+ DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType,
MapType, RowType,
+ SmallIntType, TimeType, TimestampType, TinyIntType, VarBinaryType,
VarCharType,
+};
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
use std::sync::Arc;
@@ -128,6 +132,114 @@ fn timestamp_time_unit(precision: u32) ->
crate::Result<TimeUnit> {
}
}
+/// Convert an Arrow [`DataType`](ArrowDataType) to a Paimon
[`DataType`](PaimonDataType).
+pub fn arrow_to_paimon_type(
+ arrow_type: &ArrowDataType,
+ nullable: bool,
+) -> crate::Result<PaimonDataType> {
+ match arrow_type {
+ ArrowDataType::Boolean =>
Ok(PaimonDataType::Boolean(BooleanType::with_nullable(
+ nullable,
+ ))),
+ ArrowDataType::Int8 =>
Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable(
+ nullable,
+ ))),
+ ArrowDataType::Int16 =>
Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable(
+ nullable,
+ ))),
+ ArrowDataType::Int32 =>
Ok(PaimonDataType::Int(IntType::with_nullable(nullable))),
+ ArrowDataType::Int64 =>
Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))),
+ ArrowDataType::Float32 =>
Ok(PaimonDataType::Float(FloatType::with_nullable(nullable))),
+ ArrowDataType::Float64 =>
Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable))),
+ ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 |
ArrowDataType::Utf8View => {
+ Ok(PaimonDataType::VarChar(VarCharType::with_nullable(
+ nullable,
+ VarCharType::MAX_LENGTH,
+ )?))
+ }
+ ArrowDataType::Binary | ArrowDataType::LargeBinary |
ArrowDataType::BinaryView => Ok(
+ PaimonDataType::VarBinary(VarBinaryType::try_new(nullable,
VarBinaryType::MAX_LENGTH)?),
+ ),
+ ArrowDataType::Date32 =>
Ok(PaimonDataType::Date(DateType::with_nullable(nullable))),
+ ArrowDataType::Timestamp(unit, tz) => {
+ let precision = match unit {
+ TimeUnit::Second => 0,
+ TimeUnit::Millisecond => 3,
+ TimeUnit::Microsecond => 6,
+ TimeUnit::Nanosecond => 9,
+ };
+ if tz.is_some() {
+ Ok(PaimonDataType::LocalZonedTimestamp(
+ LocalZonedTimestampType::with_nullable(nullable,
precision)?,
+ ))
+ } else {
+ Ok(PaimonDataType::Timestamp(TimestampType::with_nullable(
+ nullable, precision,
+ )?))
+ }
+ }
+ ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => {
+ let precision = match arrow_type {
+ ArrowDataType::Time32(TimeUnit::Second) => 0,
+ ArrowDataType::Time32(TimeUnit::Millisecond) => 3,
+ ArrowDataType::Time64(TimeUnit::Microsecond) => 6,
+ ArrowDataType::Time64(TimeUnit::Nanosecond) => 9,
+ _ => 0,
+ };
+ Ok(PaimonDataType::Time(TimeType::with_nullable(
+ nullable, precision,
+ )?))
+ }
+ ArrowDataType::Decimal128(p, s) =>
Ok(PaimonDataType::Decimal(DecimalType::with_nullable(
+ nullable, *p as u32, *s as u32,
+ )?)),
+ ArrowDataType::List(field) | ArrowDataType::LargeList(field) => {
+ let element = arrow_to_paimon_type(field.data_type(),
field.is_nullable())?;
+ Ok(PaimonDataType::Array(ArrayType::with_nullable(
+ nullable, element,
+ )))
+ }
+ ArrowDataType::Map(entries_field, _) => {
+ if let ArrowDataType::Struct(fields) = entries_field.data_type() {
+ if fields.len() == 2 {
+ let key = arrow_to_paimon_type(fields[0].data_type(),
fields[0].is_nullable())?;
+ let value =
+ arrow_to_paimon_type(fields[1].data_type(),
fields[1].is_nullable())?;
+ return Ok(PaimonDataType::Map(MapType::with_nullable(
+ nullable, key, value,
+ )));
+ }
+ }
+ Err(crate::Error::Unsupported {
+ message: format!("Unsupported Map structure: {arrow_type:?}"),
+ })
+ }
+ ArrowDataType::Struct(fields) => {
+ let field_slice: Vec<ArrowField> = fields.iter().map(|f|
f.as_ref().clone()).collect();
+ let paimon_fields = arrow_fields_to_paimon(&field_slice)?;
+ Ok(PaimonDataType::Row(RowType::with_nullable(
+ nullable,
+ paimon_fields,
+ )))
+ }
+ _ => Err(crate::Error::Unsupported {
+ message: format!("Unsupported Arrow type for Paimon conversion:
{arrow_type:?}"),
+ }),
+ }
+}
+
+/// Convert Arrow fields to Paimon [`DataField`]s with auto-assigned IDs
starting from 0.
+pub fn arrow_fields_to_paimon(fields: &[ArrowField]) ->
crate::Result<Vec<DataField>> {
+ fields
+ .iter()
+ .enumerate()
+ .map(|(i, f)| {
+ let paimon_type = arrow_to_paimon_type(f.data_type(),
f.is_nullable())?;
+ Ok(DataField::new(i as i32, f.name().clone(), paimon_type))
+ })
+ .collect()
+}
+
/// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s.
pub fn build_target_arrow_schema(fields: &[DataField]) ->
crate::Result<Arc<ArrowSchema>> {
let arrow_fields: Vec<ArrowField> = fields
@@ -143,3 +255,213 @@ pub fn build_target_arrow_schema(fields: &[DataField]) ->
crate::Result<Arc<Arro
.collect::<crate::Result<Vec<_>>>()?;
Ok(Arc::new(ArrowSchema::new(arrow_fields)))
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::spec::*;
+
+ /// Helper: paimon -> arrow -> paimon roundtrip, assert the arrow type
matches expected.
+ fn assert_paimon_to_arrow(paimon: &PaimonDataType, expected_arrow:
&ArrowDataType) {
+ let arrow = paimon_type_to_arrow(paimon).unwrap();
+ assert_eq!(&arrow, expected_arrow, "paimon_type_to_arrow mismatch");
+ }
+
+ /// Helper: arrow -> paimon, assert the paimon type variant matches.
+ fn assert_arrow_to_paimon(
+ arrow: &ArrowDataType,
+ nullable: bool,
+ expected_paimon: &PaimonDataType,
+ ) {
+ let paimon = arrow_to_paimon_type(arrow, nullable).unwrap();
+ assert_eq!(&paimon, expected_paimon, "arrow_to_paimon_type mismatch");
+ }
+
+ #[test]
+ fn test_primitive_roundtrip() {
+ let cases: Vec<(PaimonDataType, ArrowDataType)> = vec![
+ (
+ PaimonDataType::Boolean(BooleanType::new()),
+ ArrowDataType::Boolean,
+ ),
+ (
+ PaimonDataType::TinyInt(TinyIntType::new()),
+ ArrowDataType::Int8,
+ ),
+ (
+ PaimonDataType::SmallInt(SmallIntType::new()),
+ ArrowDataType::Int16,
+ ),
+ (PaimonDataType::Int(IntType::new()), ArrowDataType::Int32),
+ (
+ PaimonDataType::BigInt(BigIntType::new()),
+ ArrowDataType::Int64,
+ ),
+ (
+ PaimonDataType::Float(FloatType::new()),
+ ArrowDataType::Float32,
+ ),
+ (
+ PaimonDataType::Double(DoubleType::new()),
+ ArrowDataType::Float64,
+ ),
+ (PaimonDataType::Date(DateType::new()), ArrowDataType::Date32),
+ ];
+ for (paimon, arrow) in &cases {
+ assert_paimon_to_arrow(paimon, arrow);
+ assert_arrow_to_paimon(arrow, true, paimon);
+ }
+ }
+
+ #[test]
+ fn test_string_types() {
+ let varchar =
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap());
+ assert_paimon_to_arrow(&varchar, &ArrowDataType::Utf8);
+
+ // All string-like arrow types map to VarChar
+ for arrow in &[
+ ArrowDataType::Utf8,
+ ArrowDataType::LargeUtf8,
+ ArrowDataType::Utf8View,
+ ] {
+ assert_arrow_to_paimon(arrow, true, &varchar);
+ }
+ }
+
+ #[test]
+ fn test_binary_types() {
+ let varbinary = PaimonDataType::VarBinary(
+ VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(),
+ );
+ assert_paimon_to_arrow(&varbinary, &ArrowDataType::Binary);
+
+ for arrow in &[
+ ArrowDataType::Binary,
+ ArrowDataType::LargeBinary,
+ ArrowDataType::BinaryView,
+ ] {
+ assert_arrow_to_paimon(arrow, true, &varbinary);
+ }
+ }
+
+ #[test]
+ fn test_timestamp_roundtrip() {
+ // millisecond precision
+ let ts3 = PaimonDataType::Timestamp(TimestampType::new(3).unwrap());
+ assert_paimon_to_arrow(&ts3,
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None));
+ assert_arrow_to_paimon(
+ &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ &ts3,
+ );
+
+ // microsecond precision
+ let ts6 = PaimonDataType::Timestamp(TimestampType::new(6).unwrap());
+ assert_paimon_to_arrow(&ts6,
&ArrowDataType::Timestamp(TimeUnit::Microsecond, None));
+ assert_arrow_to_paimon(
+ &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ &ts6,
+ );
+
+ // nanosecond precision
+ let ts9 = PaimonDataType::Timestamp(TimestampType::new(9).unwrap());
+ assert_paimon_to_arrow(&ts9,
&ArrowDataType::Timestamp(TimeUnit::Nanosecond, None));
+ assert_arrow_to_paimon(
+ &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
+ true,
+ &ts9,
+ );
+ }
+
+ #[test]
+ fn test_local_zoned_timestamp() {
+ let lzts =
PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap());
+ let arrow = ArrowDataType::Timestamp(TimeUnit::Millisecond,
Some("UTC".into()));
+ assert_paimon_to_arrow(&lzts, &arrow);
+ assert_arrow_to_paimon(&arrow, true, &lzts);
+ }
+
+ #[test]
+ fn test_decimal_roundtrip() {
+ let dec = PaimonDataType::Decimal(DecimalType::new(10, 2).unwrap());
+ assert_paimon_to_arrow(&dec, &ArrowDataType::Decimal128(10, 2));
+ assert_arrow_to_paimon(&ArrowDataType::Decimal128(10, 2), true, &dec);
+ }
+
+ #[test]
+ fn test_array_roundtrip() {
+ let paimon_arr =
PaimonDataType::Array(ArrayType::new(PaimonDataType::Int(IntType::new())));
+ let arrow_list = ArrowDataType::List(Arc::new(ArrowField::new(
+ "element",
+ ArrowDataType::Int32,
+ true,
+ )));
+ assert_paimon_to_arrow(&paimon_arr, &arrow_list);
+
+ // arrow -> paimon: element field name doesn't matter
+ let arrow_list2 = ArrowDataType::List(Arc::new(ArrowField::new(
+ "item",
+ ArrowDataType::Int32,
+ true,
+ )));
+ let result = arrow_to_paimon_type(&arrow_list2, true).unwrap();
+ assert!(matches!(result, PaimonDataType::Array(_)));
+ }
+
+ #[test]
+ fn test_map_roundtrip() {
+ let paimon_map = PaimonDataType::Map(MapType::new(
+
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()),
+ PaimonDataType::Int(IntType::new()),
+ ));
+ let arrow_map = paimon_type_to_arrow(&paimon_map).unwrap();
+ let back = arrow_to_paimon_type(&arrow_map, true).unwrap();
+ assert!(matches!(back, PaimonDataType::Map(_)));
+ }
+
+ #[test]
+ fn test_row_roundtrip() {
+ let row = PaimonDataType::Row(RowType::new(vec![
+ DataField::new(0, "a".to_string(),
PaimonDataType::Int(IntType::new())),
+ DataField::new(
+ 1,
+ "b".to_string(),
+
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()),
+ ),
+ ]));
+ let arrow = paimon_type_to_arrow(&row).unwrap();
+ let back = arrow_to_paimon_type(&arrow, true).unwrap();
+ assert!(matches!(back, PaimonDataType::Row(_)));
+ }
+
+ #[test]
+ fn test_not_nullable() {
+ let paimon = arrow_to_paimon_type(&ArrowDataType::Int32,
false).unwrap();
+ assert!(!paimon.is_nullable());
+
+ let paimon = arrow_to_paimon_type(&ArrowDataType::Int32,
true).unwrap();
+ assert!(paimon.is_nullable());
+ }
+
+ #[test]
+ fn test_unsupported_arrow_type() {
+ let result =
arrow_to_paimon_type(&ArrowDataType::Duration(TimeUnit::Second), true);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_arrow_fields_to_paimon_ids() {
+ let fields = vec![
+ ArrowField::new("x", ArrowDataType::Int32, true),
+ ArrowField::new("y", ArrowDataType::Utf8, false),
+ ];
+ let paimon_fields = arrow_fields_to_paimon(&fields).unwrap();
+ assert_eq!(paimon_fields.len(), 2);
+ assert_eq!(paimon_fields[0].id(), 0);
+ assert_eq!(paimon_fields[0].name(), "x");
+ assert_eq!(paimon_fields[1].id(), 1);
+ assert_eq!(paimon_fields[1].name(), "y");
+ assert!(!paimon_fields[1].data_type().is_nullable());
+ }
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index b0baf66..f1dba00 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::spec::core_options::CoreOptions;
-use crate::spec::types::{DataType, RowType};
+use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::{HashMap, HashSet};
@@ -473,23 +473,16 @@ impl SchemaBuilder {
}
/// Add a column with optional description.
- ///
- /// TODO: Support RowType in schema columns with field ID assignment for
nested fields.
- /// See <https://github.com/apache/paimon/pull/1547>.
pub fn column_with_description(
mut self,
column_name: impl Into<String>,
data_type: DataType,
description: Option<String>,
) -> Self {
- if data_type.contains_row_type() {
- todo!(
- "Column type containing RowType is not supported yet: field ID
assignment for nested row fields is not implemented. See
https://github.com/apache/paimon/pull/1547"
- );
- }
let name = column_name.into();
let id = self.next_field_id;
self.next_field_id += 1;
+ let data_type = Self::assign_nested_field_ids(data_type, &mut
self.next_field_id);
self.columns
.push(DataField::new(id, name,
data_type).with_description(description));
self
@@ -535,6 +528,40 @@ impl SchemaBuilder {
self.comment,
)
}
+
+ /// Recursively assign field IDs to nested fields in complex types.
+ fn assign_nested_field_ids(data_type: DataType, next_id: &mut i32) ->
DataType {
+ let nullable = data_type.is_nullable();
+ match data_type {
+ DataType::Row(row) => {
+ let fields = row
+ .fields()
+ .iter()
+ .map(|f| {
+ let id = *next_id;
+ *next_id += 1;
+ let typ =
Self::assign_nested_field_ids(f.data_type().clone(), next_id);
+ DataField::new(id, f.name().to_string(), typ)
+ })
+ .collect();
+ DataType::Row(RowType::with_nullable(nullable, fields))
+ }
+ DataType::Array(arr) => {
+ let element =
Self::assign_nested_field_ids(arr.element_type().clone(), next_id);
+ DataType::Array(ArrayType::with_nullable(nullable, element))
+ }
+ DataType::Map(map) => {
+ let key =
Self::assign_nested_field_ids(map.key_type().clone(), next_id);
+ let value =
Self::assign_nested_field_ids(map.value_type().clone(), next_id);
+ DataType::Map(MapType::with_nullable(nullable, key, value))
+ }
+ DataType::Multiset(ms) => {
+ let element =
Self::assign_nested_field_ids(ms.element_type().clone(), next_id);
+ DataType::Multiset(MultisetType::with_nullable(nullable,
element))
+ }
+ other => other,
+ }
+ }
}
impl Default for SchemaBuilder {
@@ -718,18 +745,29 @@ mod tests {
assert_eq!(schema.primary_keys(), &["a", "b"]);
}
- /// Adding a column whose type is or contains RowType panics (todo! until
field ID assignment for nested row fields).
- /// See <https://github.com/apache/paimon/pull/1547>.
#[test]
- #[should_panic(expected = "RowType")]
- fn test_schema_builder_column_row_type_panics() {
+ fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
0,
"nested".into(),
DataType::Int(IntType::new()),
)]);
- Schema::builder()
+ let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
- .column("payload", DataType::Row(row_type));
+ .column("payload", DataType::Row(row_type))
+ .build()
+ .unwrap();
+
+ assert_eq!(schema.fields().len(), 2);
+ // id gets field_id=0, payload gets field_id=1, nested gets field_id=2
+ assert_eq!(schema.fields()[0].id(), 0);
+ assert_eq!(schema.fields()[1].id(), 1);
+ if let DataType::Row(row) = schema.fields()[1].data_type() {
+ assert_eq!(row.fields().len(), 1);
+ assert_eq!(row.fields()[0].id(), 2);
+ assert_eq!(row.fields()[0].name(), "nested");
+ } else {
+ panic!("expected Row type");
+ }
}
}