This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b3c89f  feat(datafusion): Add DDL support with PRIMARY KEY constraint 
syntax (#237)
7b3c89f is described below

commit 7b3c89fdde126d932ec2a716b726f50c8e4af719
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Apr 12 19:55:06 2026 +0800

    feat(datafusion): Add DDL support with PRIMARY KEY constraint syntax (#237)
    
    - Add PaimonDdlHandler for CREATE TABLE, ALTER TABLE (ADD/DROP/RENAME 
COLUMN, RENAME TABLE)
    - Add PaimonTableFactory for CREATE EXTERNAL TABLE via DataFusion 
TableProviderFactory
    - Extend PaimonCatalogProvider/SchemaProvider with CREATE/DROP SCHEMA and 
DROP TABLE
    - Add arrow_to_paimon_type and arrow_fields_to_paimon to paimon::arrow for 
Arrow-to-Paimon type conversion
    - Support PRIMARY KEY (col, ...) constraint syntax in CREATE TABLE DDL
---
 crates/integrations/datafusion/src/catalog.rs     |   77 ++
 crates/integrations/datafusion/src/ddl.rs         | 1234 +++++++++++++++++++++
 crates/integrations/datafusion/src/lib.rs         |    2 +
 crates/integrations/datafusion/tests/ddl_tests.rs |  497 +++++++++
 crates/paimon/src/arrow/mod.rs                    |  324 +++++-
 crates/paimon/src/spec/schema.rs                  |   68 +-
 6 files changed, 2186 insertions(+), 16 deletions(-)

diff --git a/crates/integrations/datafusion/src/catalog.rs 
b/crates/integrations/datafusion/src/catalog.rs
index 626a47f..7ac66d4 100644
--- a/crates/integrations/datafusion/src/catalog.rs
+++ b/crates/integrations/datafusion/src/catalog.rs
@@ -18,6 +18,7 @@
 //! Paimon catalog integration for DataFusion.
 
 use std::any::Any;
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 
@@ -86,6 +87,50 @@ impl CatalogProvider for PaimonCatalogProvider {
             "paimon catalog access thread panicked",
         )
     }
+
+    fn register_schema(
+        &self,
+        name: &str,
+        _schema: Arc<dyn SchemaProvider>,
+    ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
+        let catalog = Arc::clone(&self.catalog);
+        let name = name.to_string();
+        block_on_with_runtime(
+            async move {
+                catalog
+                    .create_database(&name, false, HashMap::new())
+                    .await
+                    .map_err(to_datafusion_error)?;
+                Ok(Some(
+                    Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), 
name))
+                        as Arc<dyn SchemaProvider>,
+                ))
+            },
+            "paimon catalog access thread panicked",
+        )
+    }
+
+    fn deregister_schema(
+        &self,
+        name: &str,
+        cascade: bool,
+    ) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
+        let catalog = Arc::clone(&self.catalog);
+        let name = name.to_string();
+        block_on_with_runtime(
+            async move {
+                catalog
+                    .drop_database(&name, false, cascade)
+                    .await
+                    .map_err(to_datafusion_error)?;
+                Ok(Some(
+                    Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), 
name))
+                        as Arc<dyn SchemaProvider>,
+                ))
+            },
+            "paimon catalog access thread panicked",
+        )
+    }
 }
 
 /// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing
@@ -159,4 +204,36 @@ impl SchemaProvider for PaimonSchemaProvider {
             "paimon catalog access thread panicked",
         )
     }
+
+    fn register_table(
+        &self,
+        _name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+        // DataFusion calls register_table after table creation, so we just
+        // acknowledge it here.
+        Ok(Some(table))
+    }
+
+    fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn 
TableProvider>>> {
+        let catalog = Arc::clone(&self.catalog);
+        let identifier = Identifier::new(self.database.clone(), name);
+        block_on_with_runtime(
+            async move {
+                // Try to get the table first so we can return it.
+                let table = match catalog.get_table(&identifier).await {
+                    Ok(t) => t,
+                    Err(paimon::Error::TableNotExist { .. }) => return 
Ok(None),
+                    Err(e) => return Err(to_datafusion_error(e)),
+                };
+                let provider = PaimonTableProvider::try_new(table)?;
+                catalog
+                    .drop_table(&identifier, false)
+                    .await
+                    .map_err(to_datafusion_error)?;
+                Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
+            },
+            "paimon catalog access thread panicked",
+        )
+    }
 }
diff --git a/crates/integrations/datafusion/src/ddl.rs 
b/crates/integrations/datafusion/src/ddl.rs
new file mode 100644
index 0000000..97eb900
--- /dev/null
+++ b/crates/integrations/datafusion/src/ddl.rs
@@ -0,0 +1,1234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DDL support for Paimon tables.
+//!
+//! DataFusion does not natively support all DDL statements needed by Paimon.
+//! This module provides [`PaimonDdlHandler`] which intercepts CREATE TABLE and
+//! ALTER TABLE SQL, translates them to Paimon catalog operations, and 
delegates
+//! everything else (SELECT, CREATE/DROP SCHEMA, DROP TABLE, etc.) to the
+//! underlying [`SessionContext`].
+//!
+//! Supported DDL:
+//! - `CREATE TABLE db.t (col TYPE, ..., PRIMARY KEY (col, ...)) [PARTITIONED 
BY (col TYPE, ...)] [WITH ('key' = 'val')]`
+//! - `ALTER TABLE db.t ADD COLUMN col TYPE`
+//! - `ALTER TABLE db.t DROP COLUMN col`
+//! - `ALTER TABLE db.t RENAME COLUMN old TO new`
+//! - `ALTER TABLE db.t RENAME TO new_name`
+
+use std::sync::Arc;
+
+use datafusion::arrow::array::StringArray;
+use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion::sql::sqlparser::ast::{
+    AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, 
HiveDistributionStyle,
+    ObjectName, RenameTableNameKind, SqlOption, Statement,
+};
+use datafusion::sql::sqlparser::dialect::GenericDialect;
+use datafusion::sql::sqlparser::parser::Parser;
+use paimon::catalog::{Catalog, Identifier};
+use paimon::spec::SchemaChange;
+
+use crate::error::to_datafusion_error;
+use paimon::arrow::arrow_to_paimon_type;
+
+/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL 
statements
+/// that DataFusion does not natively support (e.g. ALTER TABLE).
+///
+/// For all other SQL, it delegates to the inner `SessionContext`.
+///
+/// # Example
+/// ```ignore
+/// let handler = PaimonDdlHandler::new(ctx, catalog);
+/// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?;
+/// ```
+pub struct PaimonDdlHandler {
+    ctx: SessionContext,
+    catalog: Arc<dyn Catalog>,
+    /// The catalog name registered in the SessionContext (used to strip the 
catalog prefix).
+    catalog_name: String,
+}
+
+impl PaimonDdlHandler {
+    pub fn new(
+        ctx: SessionContext,
+        catalog: Arc<dyn Catalog>,
+        catalog_name: impl Into<String>,
+    ) -> Self {
+        Self {
+            ctx,
+            catalog,
+            catalog_name: catalog_name.into(),
+        }
+    }
+
+    /// Returns a reference to the inner [`SessionContext`].
+    pub fn ctx(&self) -> &SessionContext {
+        &self.ctx
+    }
+
+    /// Execute a SQL statement. ALTER TABLE is handled by Paimon directly;
+    /// everything else is delegated to DataFusion.
+    pub async fn sql(&self, sql: &str) -> DFResult<DataFrame> {
+        let dialect = GenericDialect {};
+        let statements = Parser::parse_sql(&dialect, sql)
+            .map_err(|e| DataFusionError::Plan(format!("SQL parse error: 
{e}")))?;
+
+        if statements.len() != 1 {
+            return Err(DataFusionError::Plan(
+                "Expected exactly one SQL statement".to_string(),
+            ));
+        }
+
+        match &statements[0] {
+            Statement::CreateTable(create_table) => 
self.handle_create_table(create_table).await,
+            Statement::AlterTable {
+                name,
+                operations,
+                if_exists,
+                ..
+            } => self.handle_alter_table(name, operations, *if_exists).await,
+            _ => self.ctx.sql(sql).await,
+        }
+    }
+
+    async fn handle_create_table(&self, ct: &CreateTable) -> 
DFResult<DataFrame> {
+        if ct.external {
+            return Err(DataFusionError::Plan(
+                "CREATE EXTERNAL TABLE is not supported. Use CREATE TABLE 
instead.".to_string(),
+            ));
+        }
+        if ct.location.is_some() {
+            return Err(DataFusionError::Plan(
+                "LOCATION is not supported for Paimon tables. Table path is 
determined by the catalog warehouse.".to_string(),
+            ));
+        }
+        if ct.query.is_some() {
+            return Err(DataFusionError::Plan(
+                "CREATE TABLE AS SELECT is not yet supported for Paimon 
tables.".to_string(),
+            ));
+        }
+
+        let identifier = self.resolve_table_name(&ct.name)?;
+
+        let mut builder = paimon::spec::Schema::builder();
+
+        // Columns
+        for col in &ct.columns {
+            let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
+            let nullable = !col.options.iter().any(|opt| {
+                matches!(
+                    opt.option,
+                    datafusion::sql::sqlparser::ast::ColumnOption::NotNull
+                )
+            });
+            let paimon_type =
+                arrow_to_paimon_type(&arrow_type, 
nullable).map_err(to_datafusion_error)?;
+            builder = builder.column(col.name.value.clone(), paimon_type);
+        }
+
+        // Primary key from constraints: PRIMARY KEY (col, ...)
+        for constraint in &ct.constraints {
+            if let 
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey {
+                columns, ..
+            } = constraint
+            {
+                let pk_cols: Vec<String> =
+                    columns.iter().map(|c| 
c.column.expr.to_string()).collect();
+                builder = builder.primary_key(pk_cols);
+            }
+        }
+
+        // Partition keys from PARTITIONED BY (col, ...)
+        if let HiveDistributionStyle::PARTITIONED { columns } = 
&ct.hive_distribution {
+            let partition_keys: Vec<String> =
+                columns.iter().map(|c| c.name.value.clone()).collect();
+            builder = builder.partition_keys(partition_keys);
+        }
+
+        // Table options from WITH ('key' = 'value', ...)
+        for (k, v) in extract_options(&ct.table_options)? {
+            builder = builder.option(k, v);
+        }
+
+        let schema = builder.build().map_err(to_datafusion_error)?;
+
+        self.catalog
+            .create_table(&identifier, schema, ct.if_not_exists)
+            .await
+            .map_err(to_datafusion_error)?;
+
+        ok_result(&self.ctx)
+    }
+
+    async fn handle_alter_table(
+        &self,
+        name: &ObjectName,
+        operations: &[AlterTableOperation],
+        if_exists: bool,
+    ) -> DFResult<DataFrame> {
+        let identifier = self.resolve_table_name(name)?;
+
+        let mut changes = Vec::new();
+        let mut rename_to: Option<Identifier> = None;
+
+        for op in operations {
+            match op {
+                AlterTableOperation::AddColumn { column_def, .. } => {
+                    let change = column_def_to_add_column(column_def)?;
+                    changes.push(change);
+                }
+                AlterTableOperation::DropColumn {
+                    column_names,
+                    if_exists: _,
+                    ..
+                } => {
+                    for col in column_names {
+                        
changes.push(SchemaChange::drop_column(col.value.clone()));
+                    }
+                }
+                AlterTableOperation::RenameColumn {
+                    old_column_name,
+                    new_column_name,
+                } => {
+                    changes.push(SchemaChange::rename_column(
+                        old_column_name.value.clone(),
+                        new_column_name.value.clone(),
+                    ));
+                }
+                AlterTableOperation::RenameTable { table_name } => {
+                    let new_name = match table_name {
+                        RenameTableNameKind::To(name) | 
RenameTableNameKind::As(name) => {
+                            object_name_to_string(name)
+                        }
+                    };
+                    rename_to = 
Some(Identifier::new(identifier.database().to_string(), new_name));
+                }
+                other => {
+                    return Err(DataFusionError::Plan(format!(
+                        "Unsupported ALTER TABLE operation: {other}"
+                    )));
+                }
+            }
+        }
+
+        if let Some(new_identifier) = rename_to {
+            self.catalog
+                .rename_table(&identifier, &new_identifier, if_exists)
+                .await
+                .map_err(to_datafusion_error)?;
+        }
+
+        if !changes.is_empty() {
+            self.catalog
+                .alter_table(&identifier, changes, if_exists)
+                .await
+                .map_err(to_datafusion_error)?;
+        }
+
+        ok_result(&self.ctx)
+    }
+
+    /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon 
Identifier.
+    fn resolve_table_name(&self, name: &ObjectName) -> DFResult<Identifier> {
+        let parts: Vec<String> = name
+            .0
+            .iter()
+            .filter_map(|p| p.as_ident().map(|id| id.value.clone()))
+            .collect();
+        match parts.len() {
+            3 => {
+                // catalog.database.table — strip catalog prefix
+                if parts[0] != self.catalog_name {
+                    return Err(DataFusionError::Plan(format!(
+                        "Unknown catalog '{}', expected '{}'",
+                        parts[0], self.catalog_name
+                    )));
+                }
+                Ok(Identifier::new(parts[1].clone(), parts[2].clone()))
+            }
+            2 => Ok(Identifier::new(parts[0].clone(), parts[1].clone())),
+            1 => Err(DataFusionError::Plan(format!(
+                "ALTER TABLE requires at least database.table, got: {}",
+                parts[0]
+            ))),
+            _ => Err(DataFusionError::Plan(format!(
+                "Invalid table reference: {name}"
+            ))),
+        }
+    }
+}
+
+/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`].
+fn column_def_to_add_column(col: &ColumnDef) -> DFResult<SchemaChange> {
+    let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
+    let nullable = !col.options.iter().any(|opt| {
+        matches!(
+            opt.option,
+            datafusion::sql::sqlparser::ast::ColumnOption::NotNull
+        )
+    });
+    let paimon_type = arrow_to_paimon_type(&arrow_type, 
nullable).map_err(to_datafusion_error)?;
+    Ok(SchemaChange::add_column(
+        col.name.value.clone(),
+        paimon_type,
+    ))
+}
+
+/// Convert a sqlparser SQL data type to an Arrow data type.
+fn sql_data_type_to_arrow(
+    sql_type: &datafusion::sql::sqlparser::ast::DataType,
+) -> DFResult<ArrowDataType> {
+    use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as 
SqlType};
+    match sql_type {
+        SqlType::Boolean => Ok(ArrowDataType::Boolean),
+        SqlType::TinyInt(_) => Ok(ArrowDataType::Int8),
+        SqlType::SmallInt(_) => Ok(ArrowDataType::Int16),
+        SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32),
+        SqlType::BigInt(_) => Ok(ArrowDataType::Int64),
+        SqlType::Float(_) => Ok(ArrowDataType::Float32),
+        SqlType::Real => Ok(ArrowDataType::Float32),
+        SqlType::Double(_) | SqlType::DoublePrecision => 
Ok(ArrowDataType::Float64),
+        SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text | 
SqlType::String(_) => {
+            Ok(ArrowDataType::Utf8)
+        }
+        SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8),
+        SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) | 
SqlType::Bytea => {
+            Ok(ArrowDataType::Binary)
+        }
+        SqlType::Date => Ok(ArrowDataType::Date32),
+        SqlType::Timestamp(precision, tz_info) => {
+            use datafusion::sql::sqlparser::ast::TimezoneInfo;
+            let unit = match precision {
+                Some(0) => datafusion::arrow::datatypes::TimeUnit::Second,
+                Some(1..=3) | None => 
datafusion::arrow::datatypes::TimeUnit::Millisecond,
+                Some(4..=6) => 
datafusion::arrow::datatypes::TimeUnit::Microsecond,
+                _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond,
+            };
+            let tz = match tz_info {
+                TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None,
+                _ => Some("UTC".into()),
+            };
+            Ok(ArrowDataType::Timestamp(unit, tz))
+        }
+        SqlType::Decimal(info) => {
+            use datafusion::sql::sqlparser::ast::ExactNumberInfo;
+            let (p, s) = match info {
+                ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as 
i8),
+                ExactNumberInfo::Precision(p) => (*p as u8, 0),
+                ExactNumberInfo::None => (10, 0),
+            };
+            Ok(ArrowDataType::Decimal128(p, s))
+        }
+        SqlType::Array(elem_def) => {
+            let elem_type = match elem_def {
+                ArrayElemTypeDef::AngleBracket(t)
+                | ArrayElemTypeDef::SquareBracket(t, _)
+                | ArrayElemTypeDef::Parenthesis(t) => 
sql_data_type_to_arrow(t)?,
+                ArrayElemTypeDef::None => {
+                    return Err(DataFusionError::Plan(
+                        "ARRAY type requires an element type".to_string(),
+                    ));
+                }
+            };
+            Ok(ArrowDataType::List(Arc::new(Field::new(
+                "element", elem_type, true,
+            ))))
+        }
+        SqlType::Map(key_type, value_type) => {
+            let key = sql_data_type_to_arrow(key_type)?;
+            let value = sql_data_type_to_arrow(value_type)?;
+            let entries = Field::new(
+                "entries",
+                ArrowDataType::Struct(
+                    vec![
+                        Field::new("key", key, false),
+                        Field::new("value", value, true),
+                    ]
+                    .into(),
+                ),
+                false,
+            );
+            Ok(ArrowDataType::Map(Arc::new(entries), false))
+        }
+        SqlType::Struct(fields, _) => {
+            let arrow_fields: Vec<Field> = fields
+                .iter()
+                .map(|f| {
+                    let name = f
+                        .field_name
+                        .as_ref()
+                        .map(|n| n.value.clone())
+                        .unwrap_or_default();
+                    let dt = sql_data_type_to_arrow(&f.field_type)?;
+                    Ok(Field::new(name, dt, true))
+                })
+                .collect::<DFResult<_>>()?;
+            Ok(ArrowDataType::Struct(arrow_fields.into()))
+        }
+        _ => Err(DataFusionError::Plan(format!(
+            "Unsupported SQL data type: {sql_type}"
+        ))),
+    }
+}
+
+fn object_name_to_string(name: &ObjectName) -> String {
+    name.0
+        .iter()
+        .filter_map(|p| p.as_ident().map(|id| id.value.clone()))
+        .collect::<Vec<_>>()
+        .join(".")
+}
+
+/// Extract key-value pairs from [`CreateTableOptions`].
+fn extract_options(opts: &CreateTableOptions) -> DFResult<Vec<(String, 
String)>> {
+    let sql_options = match opts {
+        CreateTableOptions::With(options)
+        | CreateTableOptions::Options(options)
+        | CreateTableOptions::TableProperties(options)
+        | CreateTableOptions::Plain(options) => options,
+        CreateTableOptions::None => return Ok(Vec::new()),
+    };
+    sql_options
+        .iter()
+        .map(|opt| match opt {
+            SqlOption::KeyValue { key, value } => {
+                let v = value.to_string();
+                // Strip surrounding quotes from the value if present.
+                let v = v
+                    .strip_prefix('\'')
+                    .and_then(|s| s.strip_suffix('\''))
+                    .unwrap_or(&v)
+                    .to_string();
+                Ok((key.value.clone(), v))
+            }
+            other => Err(DataFusionError::Plan(format!(
+                "Unsupported table option: {other}"
+            ))),
+        })
+        .collect()
+}
+
+/// Return an empty DataFrame with a single "result" column containing "OK".
+fn ok_result(ctx: &SessionContext) -> DFResult<DataFrame> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "result",
+        ArrowDataType::Utf8,
+        false,
+    )]));
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(StringArray::from(vec!["OK"]))],
+    )?;
+    let df = ctx.read_batch(batch)?;
+    Ok(df)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::collections::HashMap;
+    use std::sync::Mutex;
+
+    use async_trait::async_trait;
+    use datafusion::arrow::datatypes::TimeUnit;
+    use paimon::catalog::Database;
+    use paimon::spec::Schema as PaimonSchema;
+    use paimon::table::Table;
+
+    // ==================== Mock Catalog ====================
+
+    #[allow(clippy::enum_variant_names)]
+    #[derive(Debug)]
+    enum CatalogCall {
+        CreateTable {
+            identifier: Identifier,
+            schema: PaimonSchema,
+            ignore_if_exists: bool,
+        },
+        AlterTable {
+            identifier: Identifier,
+            changes: Vec<SchemaChange>,
+            ignore_if_not_exists: bool,
+        },
+        RenameTable {
+            from: Identifier,
+            to: Identifier,
+            ignore_if_not_exists: bool,
+        },
+    }
+
+    struct MockCatalog {
+        calls: Mutex<Vec<CatalogCall>>,
+    }
+
+    impl MockCatalog {
+        fn new() -> Self {
+            Self {
+                calls: Mutex::new(Vec::new()),
+            }
+        }
+
+        fn take_calls(&self) -> Vec<CatalogCall> {
+            std::mem::take(&mut *self.calls.lock().unwrap())
+        }
+    }
+
+    #[async_trait]
+    impl Catalog for MockCatalog {
+        async fn list_databases(&self) -> paimon::Result<Vec<String>> {
+            Ok(vec![])
+        }
+        async fn create_database(
+            &self,
+            _name: &str,
+            _ignore_if_exists: bool,
+            _properties: HashMap<String, String>,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn get_database(&self, _name: &str) -> paimon::Result<Database> {
+            unimplemented!()
+        }
+        async fn drop_database(
+            &self,
+            _name: &str,
+            _ignore_if_not_exists: bool,
+            _cascade: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn get_table(&self, _identifier: &Identifier) -> 
paimon::Result<Table> {
+            unimplemented!()
+        }
+        async fn list_tables(&self, _database_name: &str) -> 
paimon::Result<Vec<String>> {
+            Ok(vec![])
+        }
+        async fn create_table(
+            &self,
+            identifier: &Identifier,
+            creation: PaimonSchema,
+            ignore_if_exists: bool,
+        ) -> paimon::Result<()> {
+            self.calls.lock().unwrap().push(CatalogCall::CreateTable {
+                identifier: identifier.clone(),
+                schema: creation,
+                ignore_if_exists,
+            });
+            Ok(())
+        }
+        async fn drop_table(
+            &self,
+            _identifier: &Identifier,
+            _ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn rename_table(
+            &self,
+            from: &Identifier,
+            to: &Identifier,
+            ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            self.calls.lock().unwrap().push(CatalogCall::RenameTable {
+                from: from.clone(),
+                to: to.clone(),
+                ignore_if_not_exists,
+            });
+            Ok(())
+        }
+        async fn alter_table(
+            &self,
+            identifier: &Identifier,
+            changes: Vec<SchemaChange>,
+            ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            self.calls.lock().unwrap().push(CatalogCall::AlterTable {
+                identifier: identifier.clone(),
+                changes,
+                ignore_if_not_exists,
+            });
+            Ok(())
+        }
+    }
+
+    fn make_handler(catalog: Arc<MockCatalog>) -> PaimonDdlHandler {
+        PaimonDdlHandler::new(SessionContext::new(), catalog, "paimon")
+    }
+
+    // ==================== sql_data_type_to_arrow tests ====================
+
+    #[test]
+    fn test_sql_type_boolean() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Boolean).unwrap(),
+            ArrowDataType::Boolean
+        );
+    }
+
+    #[test]
+    fn test_sql_type_integers() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::TinyInt(None)).unwrap(),
+            ArrowDataType::Int8
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::SmallInt(None)).unwrap(),
+            ArrowDataType::Int16
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Int(None)).unwrap(),
+            ArrowDataType::Int32
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Integer(None)).unwrap(),
+            ArrowDataType::Int32
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::BigInt(None)).unwrap(),
+            ArrowDataType::Int64
+        );
+    }
+
+    #[test]
+    fn test_sql_type_floats() {
+        use datafusion::sql::sqlparser::ast::{DataType as SqlType, 
ExactNumberInfo};
+        assert_eq!(
+            
sql_data_type_to_arrow(&SqlType::Float(ExactNumberInfo::None)).unwrap(),
+            ArrowDataType::Float32
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Real).unwrap(),
+            ArrowDataType::Float32
+        );
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::DoublePrecision).unwrap(),
+            ArrowDataType::Float64
+        );
+    }
+
+    #[test]
+    fn test_sql_type_string_variants() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        for sql_type in [SqlType::Varchar(None), SqlType::Text, 
SqlType::String(None)] {
+            assert_eq!(
+                sql_data_type_to_arrow(&sql_type).unwrap(),
+                ArrowDataType::Utf8,
+                "failed for {sql_type:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_sql_type_binary() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Bytea).unwrap(),
+            ArrowDataType::Binary
+        );
+    }
+
+    #[test]
+    fn test_sql_type_date() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Date).unwrap(),
+            ArrowDataType::Date32
+        );
+    }
+
+    #[test]
+    fn test_sql_type_timestamp_default() {
+        use datafusion::sql::sqlparser::ast::{DataType as SqlType, 
TimezoneInfo};
+        let result = sql_data_type_to_arrow(&SqlType::Timestamp(None, 
TimezoneInfo::None)).unwrap();
+        assert_eq!(
+            result,
+            ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+        );
+    }
+
+    #[test]
+    fn test_sql_type_timestamp_with_precision() {
+        use datafusion::sql::sqlparser::ast::{DataType as SqlType, 
TimezoneInfo};
+        // precision 0 => Second
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Timestamp(Some(0), 
TimezoneInfo::None)).unwrap(),
+            ArrowDataType::Timestamp(TimeUnit::Second, None)
+        );
+        // precision 3 => Millisecond
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Timestamp(Some(3), 
TimezoneInfo::None)).unwrap(),
+            ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
+        );
+        // precision 6 => Microsecond
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Timestamp(Some(6), 
TimezoneInfo::None)).unwrap(),
+            ArrowDataType::Timestamp(TimeUnit::Microsecond, None)
+        );
+        // precision 9 => Nanosecond
+        assert_eq!(
+            sql_data_type_to_arrow(&SqlType::Timestamp(Some(9), 
TimezoneInfo::None)).unwrap(),
+            ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)
+        );
+    }
+
+    #[test]
+    fn test_sql_type_timestamp_with_tz() {
+        use datafusion::sql::sqlparser::ast::{DataType as SqlType, 
TimezoneInfo};
+        let result =
+            sql_data_type_to_arrow(&SqlType::Timestamp(None, 
TimezoneInfo::WithTimeZone)).unwrap();
+        assert_eq!(
+            result,
+            ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()))
+        );
+    }
+
+    #[test]
+    fn test_sql_type_decimal() {
+        use datafusion::sql::sqlparser::ast::{DataType as SqlType, 
ExactNumberInfo};
+        assert_eq!(
+            
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18, 
2)))
+                .unwrap(),
+            ArrowDataType::Decimal128(18, 2)
+        );
+        assert_eq!(
+            
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::Precision(10))).unwrap(),
+            ArrowDataType::Decimal128(10, 0)
+        );
+        assert_eq!(
+            
sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::None)).unwrap(),
+            ArrowDataType::Decimal128(10, 0)
+        );
+    }
+
+    #[test]
+    fn test_sql_type_unsupported() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err());
+    }
+
+    #[test]
+    fn test_sql_type_array() {
+        use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as 
SqlType};
+        let result = 
sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket(
+            Box::new(SqlType::Int(None)),
+        )))
+        .unwrap();
+        assert_eq!(
+            result,
+            ArrowDataType::List(Arc::new(Field::new("element", 
ArrowDataType::Int32, true)))
+        );
+    }
+
+    #[test]
+    fn test_sql_type_array_no_element() {
+        use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as 
SqlType};
+        
assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err());
+    }
+
+    #[test]
+    fn test_sql_type_map() {
+        use datafusion::sql::sqlparser::ast::DataType as SqlType;
+        let result = sql_data_type_to_arrow(&SqlType::Map(
+            Box::new(SqlType::Varchar(None)),
+            Box::new(SqlType::Int(None)),
+        ))
+        .unwrap();
+        let expected = ArrowDataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                ArrowDataType::Struct(
+                    vec![
+                        Field::new("key", ArrowDataType::Utf8, false),
+                        Field::new("value", ArrowDataType::Int32, true),
+                    ]
+                    .into(),
+                ),
+                false,
+            )),
+            false,
+        );
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_sql_type_struct() {
+        use datafusion::sql::sqlparser::ast::{
+            DataType as SqlType, Ident, StructBracketKind, StructField,
+        };
+        let result = sql_data_type_to_arrow(&SqlType::Struct(
+            vec![
+                StructField {
+                    field_name: Some(Ident::new("name")),
+                    field_type: SqlType::Varchar(None),
+                    options: None,
+                },
+                StructField {
+                    field_name: Some(Ident::new("age")),
+                    field_type: SqlType::Int(None),
+                    options: None,
+                },
+            ],
+            StructBracketKind::AngleBrackets,
+        ))
+        .unwrap();
+        assert_eq!(
+            result,
+            ArrowDataType::Struct(
+                vec![
+                    Field::new("name", ArrowDataType::Utf8, true),
+                    Field::new("age", ArrowDataType::Int32, true),
+                ]
+                .into()
+            )
+        );
+    }
+
+    // ==================== resolve_table_name tests ====================
+
+    #[test]
+    fn test_resolve_three_part_name() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let dialect = GenericDialect {};
+        let stmts = Parser::parse_sql(&dialect, "SELECT * FROM 
paimon.mydb.mytable").unwrap();
+        if let Statement::Query(q) = &stmts[0] {
+            if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = 
q.body.as_ref() {
+                if let datafusion::sql::sqlparser::ast::TableFactor::Table { 
name, .. } =
+                    &sel.from[0].relation
+                {
+                    let id = handler.resolve_table_name(name).unwrap();
+                    assert_eq!(id.database(), "mydb");
+                    assert_eq!(id.object(), "mytable");
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn test_resolve_two_part_name() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let dialect = GenericDialect {};
+        let stmts = Parser::parse_sql(&dialect, "SELECT * FROM 
mydb.mytable").unwrap();
+        if let Statement::Query(q) = &stmts[0] {
+            if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = 
q.body.as_ref() {
+                if let datafusion::sql::sqlparser::ast::TableFactor::Table { 
name, .. } =
+                    &sel.from[0].relation
+                {
+                    let id = handler.resolve_table_name(name).unwrap();
+                    assert_eq!(id.database(), "mydb");
+                    assert_eq!(id.object(), "mytable");
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn test_resolve_wrong_catalog_name() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let dialect = GenericDialect {};
+        let stmts = Parser::parse_sql(&dialect, "SELECT * FROM 
other.mydb.mytable").unwrap();
+        if let Statement::Query(q) = &stmts[0] {
+            if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = 
q.body.as_ref() {
+                if let datafusion::sql::sqlparser::ast::TableFactor::Table { 
name, .. } =
+                    &sel.from[0].relation
+                {
+                    let err = handler.resolve_table_name(name).unwrap_err();
+                    assert!(err.to_string().contains("Unknown catalog"));
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn test_resolve_single_part_name_error() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let dialect = GenericDialect {};
+        let stmts = Parser::parse_sql(&dialect, "SELECT * FROM 
mytable").unwrap();
+        if let Statement::Query(q) = &stmts[0] {
+            if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = 
q.body.as_ref() {
+                if let datafusion::sql::sqlparser::ast::TableFactor::Table { 
name, .. } =
+                    &sel.from[0].relation
+                {
+                    let err = handler.resolve_table_name(name).unwrap_err();
+                    assert!(err.to_string().contains("at least 
database.table"));
+                }
+            }
+        }
+    }
+
+    // ==================== extract_options tests ====================
+
+    #[test]
+    fn test_extract_options_none() {
+        let opts = extract_options(&CreateTableOptions::None).unwrap();
+        assert!(opts.is_empty());
+    }
+
+    #[test]
+    fn test_extract_options_with_kv() {
+        // Parse a CREATE TABLE with WITH options to get a real 
CreateTableOptions
+        let dialect = GenericDialect {};
+        let stmts =
+            Parser::parse_sql(&dialect, "CREATE TABLE t (id INT) WITH 
('bucket' = '4')").unwrap();
+        if let Statement::CreateTable(ct) = &stmts[0] {
+            let opts = extract_options(&ct.table_options).unwrap();
+            assert_eq!(opts.len(), 1);
+            assert_eq!(opts[0].0, "bucket");
+            assert_eq!(opts[0].1, "4");
+        } else {
+            panic!("expected CreateTable");
+        }
+    }
+
+    // ==================== PaimonDdlHandler::sql integration tests 
====================
+
+    #[tokio::test]
+    async fn test_create_table_basic() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("CREATE TABLE mydb.t1 (id INT NOT NULL, name VARCHAR, PRIMARY 
KEY (id))")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::CreateTable {
+            identifier,
+            schema,
+            ignore_if_exists,
+        } = &calls[0]
+        {
+            assert_eq!(identifier.database(), "mydb");
+            assert_eq!(identifier.object(), "t1");
+            assert!(!ignore_if_exists);
+            assert_eq!(schema.primary_keys(), &["id"]);
+        } else {
+            panic!("expected CreateTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_create_table_if_not_exists() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("CREATE TABLE IF NOT EXISTS mydb.t1 (id INT)")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::CreateTable {
+            ignore_if_exists, ..
+        } = &calls[0]
+        {
+            assert!(ignore_if_exists);
+        } else {
+            panic!("expected CreateTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_create_table_with_options() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("CREATE TABLE mydb.t1 (id INT) WITH ('bucket' = '4', 
'file.format' = 'parquet')")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::CreateTable { schema, .. } = &calls[0] {
+            let opts = schema.options();
+            assert_eq!(opts.get("bucket").unwrap(), "4");
+            assert_eq!(opts.get("file.format").unwrap(), "parquet");
+        } else {
+            panic!("expected CreateTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_create_table_three_part_name() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("CREATE TABLE paimon.mydb.t1 (id INT)")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        if let CatalogCall::CreateTable { identifier, .. } = &calls[0] {
+            assert_eq!(identifier.database(), "mydb");
+            assert_eq!(identifier.object(), "t1");
+        } else {
+            panic!("expected CreateTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_add_column() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::AlterTable {
+            identifier,
+            changes,
+            ..
+        } = &calls[0]
+        {
+            assert_eq!(identifier.database(), "mydb");
+            assert_eq!(identifier.object(), "t1");
+            assert_eq!(changes.len(), 1);
+            assert!(
+                matches!(&changes[0], SchemaChange::AddColumn { field_name, .. 
} if field_name == "age")
+            );
+        } else {
+            panic!("expected AlterTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_drop_column() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE mydb.t1 DROP COLUMN age")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
+            assert_eq!(changes.len(), 1);
+            assert!(
+                matches!(&changes[0], SchemaChange::DropColumn { field_name } 
if field_name == "age")
+            );
+        } else {
+            panic!("expected AlterTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_rename_column() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE mydb.t1 RENAME COLUMN old_name TO new_name")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
+            assert_eq!(changes.len(), 1);
+            assert!(matches!(
+                &changes[0],
+                SchemaChange::RenameColumn { field_name, new_name }
+                    if field_name == "old_name" && new_name == "new_name"
+            ));
+        } else {
+            panic!("expected AlterTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_rename_table() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE mydb.t1 RENAME TO t2")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::RenameTable { from, to, .. } = &calls[0] {
+            assert_eq!(from.database(), "mydb");
+            assert_eq!(from.object(), "t1");
+            assert_eq!(to.database(), "mydb");
+            assert_eq!(to.object(), "t2");
+        } else {
+            panic!("expected RenameTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_if_exists_add_column() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE IF EXISTS mydb.t1 ADD COLUMN age INT")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::AlterTable {
+            ignore_if_not_exists,
+            ..
+        } = &calls[0]
+        {
+            assert!(ignore_if_not_exists);
+        } else {
+            panic!("expected AlterTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_without_if_exists() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        if let CatalogCall::AlterTable {
+            ignore_if_not_exists,
+            ..
+        } = &calls[0]
+        {
+            assert!(!ignore_if_not_exists);
+        } else {
+            panic!("expected AlterTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_if_exists_rename() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE IF EXISTS mydb.t1 RENAME TO t2")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::RenameTable {
+            from,
+            to,
+            ignore_if_not_exists,
+        } = &calls[0]
+        {
+            assert!(ignore_if_not_exists);
+            assert_eq!(from.object(), "t1");
+            assert_eq!(to.object(), "t2");
+        } else {
+            panic!("expected RenameTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_alter_table_rename_three_part_name() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+
+        handler
+            .sql("ALTER TABLE paimon.mydb.t1 RENAME TO t2")
+            .await
+            .unwrap();
+
+        let calls = catalog.take_calls();
+        assert_eq!(calls.len(), 1);
+        if let CatalogCall::RenameTable { from, to, .. } = &calls[0] {
+            assert_eq!(from.database(), "mydb");
+            assert_eq!(from.object(), "t1");
+            assert_eq!(to.database(), "mydb");
+            assert_eq!(to.object(), "t2");
+        } else {
+            panic!("expected RenameTable call");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_sql_parse_error() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let result = handler.sql("NOT VALID SQL !!!").await;
+        assert!(result.is_err());
+        assert!(result.unwrap_err().to_string().contains("SQL parse error"));
+    }
+
+    #[tokio::test]
+    async fn test_multiple_statements_error() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let result = handler.sql("SELECT 1; SELECT 2").await;
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("exactly one SQL statement"));
+    }
+
+    #[tokio::test]
+    async fn test_create_external_table_rejected() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog);
+        let result = handler
+            .sql("CREATE EXTERNAL TABLE mydb.t1 (id INT) STORED AS PARQUET")
+            .await;
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("CREATE EXTERNAL TABLE is not supported"));
+    }
+
+    #[tokio::test]
+    async fn test_non_ddl_delegates_to_datafusion() {
+        let catalog = Arc::new(MockCatalog::new());
+        let handler = make_handler(catalog.clone());
+        // SELECT should be delegated to DataFusion, not intercepted
+        let df = handler.sql("SELECT 1 AS x").await.unwrap();
+        let batches = df.collect().await.unwrap();
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 1);
+        // No catalog calls
+        assert!(catalog.take_calls().is_empty());
+    }
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index 4e9fdb3..e803153 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -37,6 +37,7 @@
 //! translatable partition-only conjuncts from DataFusion filters.
 
 mod catalog;
+mod ddl;
 mod error;
 mod filter_pushdown;
 #[cfg(feature = "fulltext")]
@@ -47,6 +48,7 @@ pub mod runtime;
 mod table;
 
 pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
+pub use ddl::PaimonDdlHandler;
 pub use error::to_datafusion_error;
 #[cfg(feature = "fulltext")]
 pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs 
b/crates/integrations/datafusion/tests/ddl_tests.rs
new file mode 100644
index 0000000..c520f18
--- /dev/null
+++ b/crates/integrations/datafusion/tests/ddl_tests.rs
@@ -0,0 +1,497 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DDL integration tests for paimon-datafusion.
+
+use std::sync::Arc;
+
+use datafusion::catalog::CatalogProvider;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Identifier;
+use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
+use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, 
PaimonRelationPlanner};
+use tempfile::TempDir;
+
+fn create_test_env() -> (TempDir, Arc<FileSystemCatalog>) {
+    let temp_dir = TempDir::new().expect("Failed to create temp dir");
+    let warehouse = format!("file://{}", temp_dir.path().display());
+    let mut options = Options::new();
+    options.set(CatalogOptions::WAREHOUSE, warehouse);
+    let catalog = FileSystemCatalog::new(options).expect("Failed to create 
catalog");
+    (temp_dir, Arc::new(catalog))
+}
+
+fn create_handler(catalog: Arc<FileSystemCatalog>) -> PaimonDdlHandler {
+    let ctx = SessionContext::new();
+    ctx.register_catalog(
+        "paimon",
+        Arc::new(PaimonCatalogProvider::new(catalog.clone())),
+    );
+    ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+        .expect("Failed to register relation planner");
+    PaimonDdlHandler::new(ctx, catalog, "paimon")
+}
+
+// ======================= CREATE / DROP SCHEMA =======================
+
+#[tokio::test]
+async fn test_create_schema() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    handler
+        .sql("CREATE SCHEMA paimon.test_db")
+        .await
+        .expect("CREATE SCHEMA should succeed");
+
+    let databases = catalog.list_databases().await.unwrap();
+    assert!(
+        databases.contains(&"test_db".to_string()),
+        "Database test_db should exist after CREATE SCHEMA"
+    );
+}
+
+#[tokio::test]
+async fn test_drop_schema() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("drop_me", false, Default::default())
+        .await
+        .unwrap();
+
+    handler
+        .sql("DROP SCHEMA paimon.drop_me CASCADE")
+        .await
+        .expect("DROP SCHEMA should succeed");
+
+    let databases = catalog.list_databases().await.unwrap();
+    assert!(
+        !databases.contains(&"drop_me".to_string()),
+        "Database drop_me should not exist after DROP SCHEMA"
+    );
+}
+
+#[tokio::test]
+async fn test_schema_names_via_catalog_provider() {
+    let (_tmp, catalog) = create_test_env();
+    let provider = PaimonCatalogProvider::new(catalog.clone());
+
+    catalog
+        .create_database("db_a", false, Default::default())
+        .await
+        .unwrap();
+    catalog
+        .create_database("db_b", false, Default::default())
+        .await
+        .unwrap();
+
+    let names = provider.schema_names();
+    assert!(names.contains(&"db_a".to_string()));
+    assert!(names.contains(&"db_b".to_string()));
+}
+
+// ======================= CREATE TABLE =======================
+
+#[tokio::test]
+async fn test_create_table() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.mydb.users (
+                id INT NOT NULL,
+                name STRING,
+                age INT,
+                PRIMARY KEY (id)
+            )",
+        )
+        .await
+        .expect("CREATE TABLE should succeed");
+
+    let tables = catalog.list_tables("mydb").await.unwrap();
+    assert!(
+        tables.contains(&"users".to_string()),
+        "Table users should exist after CREATE TABLE"
+    );
+
+    // Verify schema
+    let table = catalog
+        .get_table(&Identifier::new("mydb", "users"))
+        .await
+        .unwrap();
+    let schema = table.schema();
+    assert_eq!(schema.fields().len(), 3);
+    assert_eq!(schema.primary_keys(), &["id"]);
+}
+
+#[tokio::test]
+async fn test_create_table_with_partition() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.mydb.events (
+                id INT NOT NULL,
+                name STRING,
+                dt STRING,
+                PRIMARY KEY (id, dt)
+            ) PARTITIONED BY (dt STRING)
+            WITH ('bucket' = '2')",
+        )
+        .await
+        .expect("CREATE TABLE with partition should succeed");
+
+    let table = catalog
+        .get_table(&Identifier::new("mydb", "events"))
+        .await
+        .unwrap();
+    let schema = table.schema();
+    assert_eq!(schema.partition_keys(), &["dt"]);
+    assert_eq!(schema.primary_keys(), &["id", "dt"]);
+    assert_eq!(
+        schema.options().get("bucket"),
+        Some(&"2".to_string()),
+        "Table option 'bucket' should be preserved"
+    );
+}
+
+#[tokio::test]
+async fn test_create_table_if_not_exists() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    let sql = "CREATE TABLE IF NOT EXISTS paimon.mydb.t1 (
+        id INT NOT NULL
+    )";
+
+    // First create should succeed
+    handler.sql(sql).await.expect("First CREATE should succeed");
+
+    // Second create with IF NOT EXISTS should also succeed
+    handler
+        .sql(sql)
+        .await
+        .expect("Second CREATE with IF NOT EXISTS should succeed");
+}
+
+#[tokio::test]
+async fn test_create_external_table_rejected() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    let result = handler
+        .sql(
+            "CREATE EXTERNAL TABLE paimon.mydb.bad (
+                id INT NOT NULL
+            ) STORED AS PARQUET
+            LOCATION '/some/path'",
+        )
+        .await;
+
+    assert!(result.is_err(), "CREATE EXTERNAL TABLE should be rejected");
+    let err_msg = result.unwrap_err().to_string();
+    assert!(
+        err_msg.contains("CREATE EXTERNAL TABLE is not supported"),
+        "Error should mention CREATE EXTERNAL TABLE is not supported, got: 
{err_msg}"
+    );
+}
+
+// ======================= CREATE TABLE with complex types 
=======================
+
+#[tokio::test]
+async fn test_create_table_with_array_and_map() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.mydb.complex_types (
+                id INT NOT NULL,
+                tags ARRAY<STRING>,
+                props MAP(STRING, INT),
+                PRIMARY KEY (id)
+            )",
+        )
+        .await
+        .expect("CREATE TABLE with ARRAY and MAP should succeed");
+
+    let table = catalog
+        .get_table(&Identifier::new("mydb", "complex_types"))
+        .await
+        .unwrap();
+    let schema = table.schema();
+    assert_eq!(schema.fields().len(), 3);
+    assert_eq!(schema.primary_keys(), &["id"]);
+
+    // Verify ARRAY<STRING> column
+    let tags_field = &schema.fields()[1];
+    assert_eq!(tags_field.name(), "tags");
+    assert_eq!(
+        *tags_field.data_type(),
+        DataType::Array(ArrayType::new(
+            DataType::VarChar(VarCharType::string_type())
+        ))
+    );
+
+    // Verify MAP(STRING, INT) column
+    let props_field = &schema.fields()[2];
+    assert_eq!(props_field.name(), "props");
+    assert_eq!(
+        *props_field.data_type(),
+        DataType::Map(MapType::new(
+            DataType::VarChar(VarCharType::string_type())
+                .copy_with_nullable(false)
+                .unwrap(),
+            DataType::Int(IntType::new()),
+        ))
+    );
+}
+
+#[tokio::test]
+async fn test_create_table_with_row_type() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    handler
+        .sql(
+            "CREATE TABLE paimon.mydb.row_table (
+                id INT NOT NULL,
+                address STRUCT<city STRING, zip INT>,
+                PRIMARY KEY (id)
+            )",
+        )
+        .await
+        .expect("CREATE TABLE with STRUCT should succeed");
+
+    let table = catalog
+        .get_table(&Identifier::new("mydb", "row_table"))
+        .await
+        .unwrap();
+    let schema = table.schema();
+    assert_eq!(schema.fields().len(), 2);
+
+    // Verify STRUCT<city STRING, zip INT> column
+    let address_field = &schema.fields()[1];
+    assert_eq!(address_field.name(), "address");
+    if let DataType::Row(row) = address_field.data_type() {
+        assert_eq!(row.fields().len(), 2);
+        assert_eq!(row.fields()[0].name(), "city");
+        assert!(matches!(row.fields()[0].data_type(), DataType::VarChar(_)));
+        assert_eq!(row.fields()[1].name(), "zip");
+        assert!(matches!(row.fields()[1].data_type(), DataType::Int(_)));
+    } else {
+        panic!("expected Row type for address column");
+    }
+}
+
+// ======================= DROP TABLE =======================
+
+#[tokio::test]
+async fn test_drop_table() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    // Create a table first
+    let schema = paimon::spec::Schema::builder()
+        .column(
+            "id",
+            paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+        )
+        .build()
+        .unwrap();
+    catalog
+        .create_table(&Identifier::new("mydb", "to_drop"), schema, false)
+        .await
+        .unwrap();
+
+    assert!(catalog
+        .list_tables("mydb")
+        .await
+        .unwrap()
+        .contains(&"to_drop".to_string()));
+
+    handler
+        .sql("DROP TABLE paimon.mydb.to_drop")
+        .await
+        .expect("DROP TABLE should succeed");
+
+    assert!(
+        !catalog
+            .list_tables("mydb")
+            .await
+            .unwrap()
+            .contains(&"to_drop".to_string()),
+        "Table should not exist after DROP TABLE"
+    );
+}
+
+// ======================= ALTER TABLE =======================
+
+#[tokio::test]
+async fn test_alter_table_add_column() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    let schema = paimon::spec::Schema::builder()
+        .column(
+            "id",
+            paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+        )
+        .column(
+            "name",
+            
paimon::spec::DataType::VarChar(paimon::spec::VarCharType::string_type()),
+        )
+        .build()
+        .unwrap();
+    catalog
+        .create_table(&Identifier::new("mydb", "alter_test"), schema, false)
+        .await
+        .unwrap();
+
+    // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect 
an error
+    let result = handler
+        .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT")
+        .await;
+
+    // FileSystemCatalog returns Unsupported for alter_table, which is expected
+    assert!(
+        result.is_err(),
+        "ALTER TABLE should fail because FileSystemCatalog does not implement 
alter_table yet"
+    );
+    let err_msg = result.unwrap_err().to_string();
+    assert!(
+        err_msg.contains("not yet implemented") || 
err_msg.contains("Unsupported"),
+        "Error should indicate alter_table is not implemented, got: {err_msg}"
+    );
+}
+
+#[tokio::test]
+async fn test_alter_table_rename() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    let schema = paimon::spec::Schema::builder()
+        .column(
+            "id",
+            paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+        )
+        .build()
+        .unwrap();
+    catalog
+        .create_table(&Identifier::new("mydb", "old_name"), schema, false)
+        .await
+        .unwrap();
+
+    handler
+        .sql("ALTER TABLE mydb.old_name RENAME TO new_name")
+        .await
+        .expect("ALTER TABLE RENAME should succeed");
+
+    let tables = catalog.list_tables("mydb").await.unwrap();
+    assert!(
+        !tables.contains(&"old_name".to_string()),
+        "old_name should not exist after rename"
+    );
+    assert!(
+        tables.contains(&"new_name".to_string()),
+        "new_name should exist after rename"
+    );
+}
+
+#[tokio::test]
+async fn test_ddl_handler_delegates_select() {
+    let (_tmp, catalog) = create_test_env();
+    let handler = create_handler(catalog.clone());
+
+    catalog
+        .create_database("mydb", false, Default::default())
+        .await
+        .unwrap();
+
+    let schema = paimon::spec::Schema::builder()
+        .column(
+            "id",
+            paimon::spec::DataType::Int(paimon::spec::IntType::new()),
+        )
+        .build()
+        .unwrap();
+    catalog
+        .create_table(&Identifier::new("mydb", "t1"), schema, false)
+        .await
+        .unwrap();
+
+    // SELECT should be delegated to DataFusion
+    let df = handler
+        .sql("SELECT * FROM paimon.mydb.t1")
+        .await
+        .expect("SELECT should be delegated to DataFusion");
+
+    let batches = df.collect().await.expect("SELECT should execute");
+    // Empty table, but should succeed
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(total_rows, 0, "Empty table should return 0 rows");
+}
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index f48b9da..e2f60fc 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -22,7 +22,11 @@ pub(crate) mod schema_evolution;
 
 pub use crate::arrow::reader::ArrowReaderBuilder;
 
-use crate::spec::{DataField, DataType as PaimonDataType};
+use crate::spec::{
+    ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType, 
DateType,
+    DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType, 
MapType, RowType,
+    SmallIntType, TimeType, TimestampType, TinyIntType, VarBinaryType, 
VarCharType,
+};
 use arrow_schema::DataType as ArrowDataType;
 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit};
 use std::sync::Arc;
@@ -128,6 +132,114 @@ fn timestamp_time_unit(precision: u32) -> 
crate::Result<TimeUnit> {
     }
 }
 
+/// Convert an Arrow [`DataType`](ArrowDataType) to a Paimon 
[`DataType`](PaimonDataType).
+pub fn arrow_to_paimon_type(
+    arrow_type: &ArrowDataType,
+    nullable: bool,
+) -> crate::Result<PaimonDataType> {
+    match arrow_type {
+        ArrowDataType::Boolean => 
Ok(PaimonDataType::Boolean(BooleanType::with_nullable(
+            nullable,
+        ))),
+        ArrowDataType::Int8 => 
Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable(
+            nullable,
+        ))),
+        ArrowDataType::Int16 => 
Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable(
+            nullable,
+        ))),
+        ArrowDataType::Int32 => 
Ok(PaimonDataType::Int(IntType::with_nullable(nullable))),
+        ArrowDataType::Int64 => 
Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))),
+        ArrowDataType::Float32 => 
Ok(PaimonDataType::Float(FloatType::with_nullable(nullable))),
+        ArrowDataType::Float64 => 
Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable))),
+        ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | 
ArrowDataType::Utf8View => {
+            Ok(PaimonDataType::VarChar(VarCharType::with_nullable(
+                nullable,
+                VarCharType::MAX_LENGTH,
+            )?))
+        }
+        ArrowDataType::Binary | ArrowDataType::LargeBinary | 
ArrowDataType::BinaryView => Ok(
+            PaimonDataType::VarBinary(VarBinaryType::try_new(nullable, 
VarBinaryType::MAX_LENGTH)?),
+        ),
+        ArrowDataType::Date32 => 
Ok(PaimonDataType::Date(DateType::with_nullable(nullable))),
+        ArrowDataType::Timestamp(unit, tz) => {
+            let precision = match unit {
+                TimeUnit::Second => 0,
+                TimeUnit::Millisecond => 3,
+                TimeUnit::Microsecond => 6,
+                TimeUnit::Nanosecond => 9,
+            };
+            if tz.is_some() {
+                Ok(PaimonDataType::LocalZonedTimestamp(
+                    LocalZonedTimestampType::with_nullable(nullable, 
precision)?,
+                ))
+            } else {
+                Ok(PaimonDataType::Timestamp(TimestampType::with_nullable(
+                    nullable, precision,
+                )?))
+            }
+        }
+        ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => {
+            let precision = match arrow_type {
+                ArrowDataType::Time32(TimeUnit::Second) => 0,
+                ArrowDataType::Time32(TimeUnit::Millisecond) => 3,
+                ArrowDataType::Time64(TimeUnit::Microsecond) => 6,
+                ArrowDataType::Time64(TimeUnit::Nanosecond) => 9,
+                _ => 0,
+            };
+            Ok(PaimonDataType::Time(TimeType::with_nullable(
+                nullable, precision,
+            )?))
+        }
+        ArrowDataType::Decimal128(p, s) => 
Ok(PaimonDataType::Decimal(DecimalType::with_nullable(
+            nullable, *p as u32, *s as u32,
+        )?)),
+        ArrowDataType::List(field) | ArrowDataType::LargeList(field) => {
+            let element = arrow_to_paimon_type(field.data_type(), 
field.is_nullable())?;
+            Ok(PaimonDataType::Array(ArrayType::with_nullable(
+                nullable, element,
+            )))
+        }
+        ArrowDataType::Map(entries_field, _) => {
+            if let ArrowDataType::Struct(fields) = entries_field.data_type() {
+                if fields.len() == 2 {
+                    let key = arrow_to_paimon_type(fields[0].data_type(), 
fields[0].is_nullable())?;
+                    let value =
+                        arrow_to_paimon_type(fields[1].data_type(), 
fields[1].is_nullable())?;
+                    return Ok(PaimonDataType::Map(MapType::with_nullable(
+                        nullable, key, value,
+                    )));
+                }
+            }
+            Err(crate::Error::Unsupported {
+                message: format!("Unsupported Map structure: {arrow_type:?}"),
+            })
+        }
+        ArrowDataType::Struct(fields) => {
+            let field_slice: Vec<ArrowField> = fields.iter().map(|f| 
f.as_ref().clone()).collect();
+            let paimon_fields = arrow_fields_to_paimon(&field_slice)?;
+            Ok(PaimonDataType::Row(RowType::with_nullable(
+                nullable,
+                paimon_fields,
+            )))
+        }
+        _ => Err(crate::Error::Unsupported {
+            message: format!("Unsupported Arrow type for Paimon conversion: 
{arrow_type:?}"),
+        }),
+    }
+}
+
+/// Convert Arrow fields to Paimon [`DataField`]s with auto-assigned IDs 
starting from 0.
+pub fn arrow_fields_to_paimon(fields: &[ArrowField]) -> 
crate::Result<Vec<DataField>> {
+    fields
+        .iter()
+        .enumerate()
+        .map(|(i, f)| {
+            let paimon_type = arrow_to_paimon_type(f.data_type(), 
f.is_nullable())?;
+            Ok(DataField::new(i as i32, f.name().clone(), paimon_type))
+        })
+        .collect()
+}
+
 /// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s.
 pub fn build_target_arrow_schema(fields: &[DataField]) -> 
crate::Result<Arc<ArrowSchema>> {
     let arrow_fields: Vec<ArrowField> = fields
@@ -143,3 +255,213 @@ pub fn build_target_arrow_schema(fields: &[DataField]) -> 
crate::Result<Arc<Arro
         .collect::<crate::Result<Vec<_>>>()?;
     Ok(Arc::new(ArrowSchema::new(arrow_fields)))
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::spec::*;
+
+    /// Helper: paimon -> arrow -> paimon roundtrip, assert the arrow type 
matches expected.
+    fn assert_paimon_to_arrow(paimon: &PaimonDataType, expected_arrow: 
&ArrowDataType) {
+        let arrow = paimon_type_to_arrow(paimon).unwrap();
+        assert_eq!(&arrow, expected_arrow, "paimon_type_to_arrow mismatch");
+    }
+
+    /// Helper: arrow -> paimon, assert the paimon type variant matches.
+    fn assert_arrow_to_paimon(
+        arrow: &ArrowDataType,
+        nullable: bool,
+        expected_paimon: &PaimonDataType,
+    ) {
+        let paimon = arrow_to_paimon_type(arrow, nullable).unwrap();
+        assert_eq!(&paimon, expected_paimon, "arrow_to_paimon_type mismatch");
+    }
+
+    #[test]
+    fn test_primitive_roundtrip() {
+        let cases: Vec<(PaimonDataType, ArrowDataType)> = vec![
+            (
+                PaimonDataType::Boolean(BooleanType::new()),
+                ArrowDataType::Boolean,
+            ),
+            (
+                PaimonDataType::TinyInt(TinyIntType::new()),
+                ArrowDataType::Int8,
+            ),
+            (
+                PaimonDataType::SmallInt(SmallIntType::new()),
+                ArrowDataType::Int16,
+            ),
+            (PaimonDataType::Int(IntType::new()), ArrowDataType::Int32),
+            (
+                PaimonDataType::BigInt(BigIntType::new()),
+                ArrowDataType::Int64,
+            ),
+            (
+                PaimonDataType::Float(FloatType::new()),
+                ArrowDataType::Float32,
+            ),
+            (
+                PaimonDataType::Double(DoubleType::new()),
+                ArrowDataType::Float64,
+            ),
+            (PaimonDataType::Date(DateType::new()), ArrowDataType::Date32),
+        ];
+        for (paimon, arrow) in &cases {
+            assert_paimon_to_arrow(paimon, arrow);
+            assert_arrow_to_paimon(arrow, true, paimon);
+        }
+    }
+
+    #[test]
+    fn test_string_types() {
+        let varchar = 
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap());
+        assert_paimon_to_arrow(&varchar, &ArrowDataType::Utf8);
+
+        // All string-like arrow types map to VarChar
+        for arrow in &[
+            ArrowDataType::Utf8,
+            ArrowDataType::LargeUtf8,
+            ArrowDataType::Utf8View,
+        ] {
+            assert_arrow_to_paimon(arrow, true, &varchar);
+        }
+    }
+
+    #[test]
+    fn test_binary_types() {
+        let varbinary = PaimonDataType::VarBinary(
+            VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(),
+        );
+        assert_paimon_to_arrow(&varbinary, &ArrowDataType::Binary);
+
+        for arrow in &[
+            ArrowDataType::Binary,
+            ArrowDataType::LargeBinary,
+            ArrowDataType::BinaryView,
+        ] {
+            assert_arrow_to_paimon(arrow, true, &varbinary);
+        }
+    }
+
+    #[test]
+    fn test_timestamp_roundtrip() {
+        // millisecond precision
+        let ts3 = PaimonDataType::Timestamp(TimestampType::new(3).unwrap());
+        assert_paimon_to_arrow(&ts3, 
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None));
+        assert_arrow_to_paimon(
+            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
+            true,
+            &ts3,
+        );
+
+        // microsecond precision
+        let ts6 = PaimonDataType::Timestamp(TimestampType::new(6).unwrap());
+        assert_paimon_to_arrow(&ts6, 
&ArrowDataType::Timestamp(TimeUnit::Microsecond, None));
+        assert_arrow_to_paimon(
+            &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
+            true,
+            &ts6,
+        );
+
+        // nanosecond precision
+        let ts9 = PaimonDataType::Timestamp(TimestampType::new(9).unwrap());
+        assert_paimon_to_arrow(&ts9, 
&ArrowDataType::Timestamp(TimeUnit::Nanosecond, None));
+        assert_arrow_to_paimon(
+            &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
+            true,
+            &ts9,
+        );
+    }
+
+    #[test]
+    fn test_local_zoned_timestamp() {
+        let lzts = 
PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap());
+        let arrow = ArrowDataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into()));
+        assert_paimon_to_arrow(&lzts, &arrow);
+        assert_arrow_to_paimon(&arrow, true, &lzts);
+    }
+
+    #[test]
+    fn test_decimal_roundtrip() {
+        let dec = PaimonDataType::Decimal(DecimalType::new(10, 2).unwrap());
+        assert_paimon_to_arrow(&dec, &ArrowDataType::Decimal128(10, 2));
+        assert_arrow_to_paimon(&ArrowDataType::Decimal128(10, 2), true, &dec);
+    }
+
+    #[test]
+    fn test_array_roundtrip() {
+        let paimon_arr = 
PaimonDataType::Array(ArrayType::new(PaimonDataType::Int(IntType::new())));
+        let arrow_list = ArrowDataType::List(Arc::new(ArrowField::new(
+            "element",
+            ArrowDataType::Int32,
+            true,
+        )));
+        assert_paimon_to_arrow(&paimon_arr, &arrow_list);
+
+        // arrow -> paimon: element field name doesn't matter
+        let arrow_list2 = ArrowDataType::List(Arc::new(ArrowField::new(
+            "item",
+            ArrowDataType::Int32,
+            true,
+        )));
+        let result = arrow_to_paimon_type(&arrow_list2, true).unwrap();
+        assert!(matches!(result, PaimonDataType::Array(_)));
+    }
+
+    #[test]
+    fn test_map_roundtrip() {
+        let paimon_map = PaimonDataType::Map(MapType::new(
+            
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()),
+            PaimonDataType::Int(IntType::new()),
+        ));
+        let arrow_map = paimon_type_to_arrow(&paimon_map).unwrap();
+        let back = arrow_to_paimon_type(&arrow_map, true).unwrap();
+        assert!(matches!(back, PaimonDataType::Map(_)));
+    }
+
+    #[test]
+    fn test_row_roundtrip() {
+        let row = PaimonDataType::Row(RowType::new(vec![
+            DataField::new(0, "a".to_string(), 
PaimonDataType::Int(IntType::new())),
+            DataField::new(
+                1,
+                "b".to_string(),
+                
PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()),
+            ),
+        ]));
+        let arrow = paimon_type_to_arrow(&row).unwrap();
+        let back = arrow_to_paimon_type(&arrow, true).unwrap();
+        assert!(matches!(back, PaimonDataType::Row(_)));
+    }
+
+    #[test]
+    fn test_not_nullable() {
+        let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, 
false).unwrap();
+        assert!(!paimon.is_nullable());
+
+        let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, 
true).unwrap();
+        assert!(paimon.is_nullable());
+    }
+
+    #[test]
+    fn test_unsupported_arrow_type() {
+        let result = 
arrow_to_paimon_type(&ArrowDataType::Duration(TimeUnit::Second), true);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_arrow_fields_to_paimon_ids() {
+        let fields = vec![
+            ArrowField::new("x", ArrowDataType::Int32, true),
+            ArrowField::new("y", ArrowDataType::Utf8, false),
+        ];
+        let paimon_fields = arrow_fields_to_paimon(&fields).unwrap();
+        assert_eq!(paimon_fields.len(), 2);
+        assert_eq!(paimon_fields[0].id(), 0);
+        assert_eq!(paimon_fields[0].name(), "x");
+        assert_eq!(paimon_fields[1].id(), 1);
+        assert_eq!(paimon_fields[1].name(), "y");
+        assert!(!paimon_fields[1].data_type().is_nullable());
+    }
+}
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index b0baf66..f1dba00 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::spec::core_options::CoreOptions;
-use crate::spec::types::{DataType, RowType};
+use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
 use serde::{Deserialize, Serialize};
 use serde_with::serde_as;
 use std::collections::{HashMap, HashSet};
@@ -473,23 +473,16 @@ impl SchemaBuilder {
     }
 
     /// Add a column with optional description.
-    ///
-    /// TODO: Support RowType in schema columns with field ID assignment for 
nested fields.
-    /// See <https://github.com/apache/paimon/pull/1547>.
     pub fn column_with_description(
         mut self,
         column_name: impl Into<String>,
         data_type: DataType,
         description: Option<String>,
     ) -> Self {
-        if data_type.contains_row_type() {
-            todo!(
-                "Column type containing RowType is not supported yet: field ID 
assignment for nested row fields is not implemented. See 
https://github.com/apache/paimon/pull/1547";
-            );
-        }
         let name = column_name.into();
         let id = self.next_field_id;
         self.next_field_id += 1;
+        let data_type = Self::assign_nested_field_ids(data_type, &mut 
self.next_field_id);
         self.columns
             .push(DataField::new(id, name, 
data_type).with_description(description));
         self
@@ -535,6 +528,40 @@ impl SchemaBuilder {
             self.comment,
         )
     }
+
+    /// Recursively assign field IDs to nested fields in complex types.
+    fn assign_nested_field_ids(data_type: DataType, next_id: &mut i32) -> 
DataType {
+        let nullable = data_type.is_nullable();
+        match data_type {
+            DataType::Row(row) => {
+                let fields = row
+                    .fields()
+                    .iter()
+                    .map(|f| {
+                        let id = *next_id;
+                        *next_id += 1;
+                        let typ = 
Self::assign_nested_field_ids(f.data_type().clone(), next_id);
+                        DataField::new(id, f.name().to_string(), typ)
+                    })
+                    .collect();
+                DataType::Row(RowType::with_nullable(nullable, fields))
+            }
+            DataType::Array(arr) => {
+                let element = 
Self::assign_nested_field_ids(arr.element_type().clone(), next_id);
+                DataType::Array(ArrayType::with_nullable(nullable, element))
+            }
+            DataType::Map(map) => {
+                let key = 
Self::assign_nested_field_ids(map.key_type().clone(), next_id);
+                let value = 
Self::assign_nested_field_ids(map.value_type().clone(), next_id);
+                DataType::Map(MapType::with_nullable(nullable, key, value))
+            }
+            DataType::Multiset(ms) => {
+                let element = 
Self::assign_nested_field_ids(ms.element_type().clone(), next_id);
+                DataType::Multiset(MultisetType::with_nullable(nullable, 
element))
+            }
+            other => other,
+        }
+    }
 }
 
 impl Default for SchemaBuilder {
@@ -718,18 +745,29 @@ mod tests {
         assert_eq!(schema.primary_keys(), &["a", "b"]);
     }
 
-    /// Adding a column whose type is or contains RowType panics (todo! until 
field ID assignment for nested row fields).
-    /// See <https://github.com/apache/paimon/pull/1547>.
     #[test]
-    #[should_panic(expected = "RowType")]
-    fn test_schema_builder_column_row_type_panics() {
+    fn test_schema_builder_column_row_type() {
         let row_type = RowType::new(vec![DataField::new(
             0,
             "nested".into(),
             DataType::Int(IntType::new()),
         )]);
-        Schema::builder()
+        let schema = Schema::builder()
             .column("id", DataType::Int(IntType::new()))
-            .column("payload", DataType::Row(row_type));
+            .column("payload", DataType::Row(row_type))
+            .build()
+            .unwrap();
+
+        assert_eq!(schema.fields().len(), 2);
+        // id gets field_id=0, payload gets field_id=1, nested gets field_id=2
+        assert_eq!(schema.fields()[0].id(), 0);
+        assert_eq!(schema.fields()[1].id(), 1);
+        if let DataType::Row(row) = schema.fields()[1].data_type() {
+            assert_eq!(row.fields().len(), 1);
+            assert_eq!(row.fields()[0].id(), 2);
+            assert_eq!(row.fields()[0].name(), "nested");
+        } else {
+            panic!("expected Row type");
+        }
     }
 }

Reply via email to