This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5cf85dab9 feat(datafusion): Support `CREATE TABLE` for DataFusion
(#1972)
5cf85dab9 is described below
commit 5cf85dab903e171a077481597be1e49a1159715a
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Jan 13 16:00:03 2026 -0800
feat(datafusion): Support `CREATE TABLE` for DataFusion (#1972)
## Which issue does this PR close?
- Closes #1905
## What changes are included in this PR?
- Implement `register_table` in `IcebergSchemaProvider`
- Added a new slt: `create_table.slt`
- Updated existing slts
Note: This does NOT cover the syntax `CREATE TABLE ... AS VALUES`
## Are these changes tested?
Yes
---
Cargo.lock | 1 +
Cargo.toml | 1 +
crates/integrations/datafusion/Cargo.toml | 1 +
crates/integrations/datafusion/src/schema.rs | 272 +++++++++++++++++++--
crates/sqllogictest/src/engine/datafusion.rs | 31 +--
.../sqllogictest/testdata/schedules/df_test.toml | 4 +
.../testdata/slts/df_test/create_table.slt | 90 +++++++
.../testdata/slts/df_test/insert_into.slt | 4 +
.../testdata/slts/df_test/show_tables.slt | 3 -
9 files changed, 358 insertions(+), 49 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 51af571d5..3de43e685 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3525,6 +3525,7 @@ version = "0.8.0"
dependencies = [
"anyhow",
"async-trait",
+ "dashmap",
"datafusion",
"expect-test",
"futures",
diff --git a/Cargo.toml b/Cargo.toml
index 56cd1801c..517bfa36e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -62,6 +62,7 @@ bytes = "1.10"
chrono = "0.4.41"
clap = { version = "4.5.48", features = ["derive", "cargo"] }
ctor = "0.2.8"
+dashmap = "6"
datafusion = "51.0"
datafusion-cli = "51.0"
datafusion-sqllogictest = "51.0"
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 0ee1738b4..fd3e489e4 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -31,6 +31,7 @@ repository = { workspace = true }
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
+dashmap = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
diff --git a/crates/integrations/datafusion/src/schema.rs
b/crates/integrations/datafusion/src/schema.rs
index 31bbdbd67..022964ba6 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -16,16 +16,20 @@
// under the License.
use std::any::Any;
-use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
+use dashmap::DashMap;
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::execution::TaskContext;
+use datafusion::prelude::SessionContext;
+use futures::StreamExt;
use futures::future::try_join_all;
+use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
use iceberg::inspect::MetadataTableType;
-use iceberg::{Catalog, NamespaceIdent, Result};
+use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result,
TableCreation};
use crate::table::IcebergTableProvider;
use crate::to_datafusion_error;
@@ -34,10 +38,15 @@ use crate::to_datafusion_error;
/// access to table providers within a specific namespace.
#[derive(Debug)]
pub(crate) struct IcebergSchemaProvider {
- /// A `HashMap` where keys are table names
+ /// Reference to the Iceberg catalog
+ catalog: Arc<dyn Catalog>,
+ /// The namespace this schema represents
+ namespace: NamespaceIdent,
+ /// A concurrent map where keys are table names
/// and values are dynamic references to objects implementing the
/// [`TableProvider`] trait.
- tables: HashMap<String, Arc<IcebergTableProvider>>,
+ /// Wrapped in Arc to allow sharing across async boundaries in
register_table.
+ tables: Arc<DashMap<String, Arc<IcebergTableProvider>>>,
}
impl IcebergSchemaProvider {
@@ -71,13 +80,16 @@ impl IcebergSchemaProvider {
)
.await?;
- let tables: HashMap<String, Arc<IcebergTableProvider>> = table_names
- .into_iter()
- .zip(providers.into_iter())
- .map(|(name, provider)| (name, Arc::new(provider)))
- .collect();
+ let tables = Arc::new(DashMap::new());
+ for (name, provider) in
table_names.into_iter().zip(providers.into_iter()) {
+ tables.insert(name, Arc::new(provider));
+ }
- Ok(IcebergSchemaProvider { tables })
+ Ok(IcebergSchemaProvider {
+ catalog: client,
+ namespace,
+ tables,
+ })
}
}
@@ -89,13 +101,16 @@ impl SchemaProvider for IcebergSchemaProvider {
fn table_names(&self) -> Vec<String> {
self.tables
- .keys()
- .flat_map(|table_name| {
+ .iter()
+ .flat_map(|entry| {
+ let table_name = entry.key().clone();
[table_name.clone()]
.into_iter()
-
.chain(MetadataTableType::all_types().map(|metadata_table_name| {
- format!("{}${}", table_name.clone(),
metadata_table_name.as_str())
- }))
+ .chain(
+ MetadataTableType::all_types().map(move
|metadata_table_name| {
+ format!("{}${}", table_name,
metadata_table_name.as_str())
+ }),
+ )
})
.collect()
}
@@ -127,7 +142,230 @@ impl SchemaProvider for IcebergSchemaProvider {
Ok(self
.tables
.get(name)
- .cloned()
- .map(|t| t as Arc<dyn TableProvider>))
+ .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ // Check if table already exists
+ if self.table_exist(name.as_str()) {
+ return Err(DataFusionError::Execution(format!(
+ "Table {name} already exists"
+ )));
+ }
+
+ // Convert DataFusion schema to Iceberg schema
+ // DataFusion schemas don't have field IDs, so we use the function
that assigns them automatically
+ let df_schema = table.schema();
+ let iceberg_schema =
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+ .map_err(to_datafusion_error)?;
+
+ // Create the table in the Iceberg catalog
+ let table_creation = TableCreation::builder()
+ .name(name.clone())
+ .schema(iceberg_schema)
+ .build();
+
+ let catalog = self.catalog.clone();
+ let namespace = self.namespace.clone();
+ let tables = self.tables.clone();
+ let name_clone = name.clone();
+
+ // Use tokio's spawn_blocking to handle the async work on a blocking
thread pool
+ let result = tokio::task::spawn_blocking(move || {
+ // Create a new runtime handle to execute the async work
+ let rt = tokio::runtime::Handle::current();
+ rt.block_on(async move {
+ // Verify the input table is empty - CREATE TABLE only accepts
schema definition
+ ensure_table_is_empty(&table)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ catalog
+ .create_table(&namespace, table_creation)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Create a new table provider using the catalog reference
+ let table_provider = IcebergTableProvider::try_new(
+ catalog.clone(),
+ namespace.clone(),
+ name_clone.clone(),
+ )
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Store the new table provider
+ tables.insert(name_clone, Arc::new(table_provider));
+
+ Ok(None)
+ })
+ });
+
+ // Block on the spawned task to get the result
+ // This is safe because spawn_blocking moves the blocking to a
dedicated thread pool
+ futures::executor::block_on(result).map_err(|e| {
+ DataFusionError::Execution(format!("Failed to create Iceberg
table: {e}"))
+ })?
+ }
+}
+
+/// Verifies that a table provider contains no data by scanning with LIMIT 1.
+/// Returns an error if the table has any rows.
+async fn ensure_table_is_empty(table: &Arc<dyn TableProvider>) -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let exec_plan = table
+ .scan(&session_ctx.state(), None, &[], Some(1))
+ .await
+ .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to scan
table: {e}")))?;
+
+ let task_ctx = Arc::new(TaskContext::default());
+ let stream = exec_plan.execute(0, task_ctx).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to execute scan: {e}"),
+ )
+ })?;
+
+ let batches: Vec<_> = stream.collect().await;
+ let has_data = batches
+ .into_iter()
+ .filter_map(|r| r.ok())
+ .any(|batch| batch.num_rows() > 0);
+
+ if has_data {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "register_table does not support tables with data.",
+ ));
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
+ use datafusion::arrow::array::{Int32Array, StringArray};
+ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
+ use datafusion::arrow::record_batch::RecordBatch;
+ use datafusion::datasource::MemTable;
+ use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+ use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
+ use tempfile::TempDir;
+
+ use super::*;
+
+ async fn create_test_schema_provider() -> (IcebergSchemaProvider, TempDir)
{
+ let temp_dir = TempDir::new().unwrap();
+ let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+ let catalog = MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(),
warehouse_path.clone())]),
+ )
+ .await
+ .unwrap();
+
+ let namespace = NamespaceIdent::new("test_ns".to_string());
+ catalog
+ .create_namespace(&namespace, HashMap::new())
+ .await
+ .unwrap();
+
+ let provider = IcebergSchemaProvider::try_new(Arc::new(catalog),
namespace)
+ .await
+ .unwrap();
+
+ (provider, temp_dir)
+ }
+
+ #[tokio::test]
+ async fn test_register_table_with_data_fails() {
+ let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+ // Create a MemTable with data
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+
+ let batch = RecordBatch::try_new(arrow_schema.clone(), vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+ ])
+ .unwrap();
+
+ let mem_table = MemTable::try_new(arrow_schema,
vec![vec![batch]]).unwrap();
+
+ // Attempt to register the table with data - should fail
+ let result = schema_provider.register_table("test_table".to_string(),
Arc::new(mem_table));
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("register_table does not support tables with data."),
+ "Expected error about tables with data, got: {err}",
+ );
+ }
+
+ #[tokio::test]
+ async fn test_register_empty_table_succeeds() {
+ let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+ // Create an empty MemTable (schema only, no data rows)
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+
+ // Create an empty batch (0 rows) - MemTable requires at least one
partition
+ let empty_batch = RecordBatch::new_empty(arrow_schema.clone());
+ let mem_table = MemTable::try_new(arrow_schema,
vec![vec![empty_batch]]).unwrap();
+
+ // Attempt to register the empty table - should succeed
+ let result = schema_provider.register_table("empty_table".to_string(),
Arc::new(mem_table));
+
+ assert!(result.is_ok(), "Expected success, got: {result:?}");
+
+ // Verify the table was registered
+ assert!(schema_provider.table_exist("empty_table"));
+ }
+
+ #[tokio::test]
+ async fn test_register_duplicate_table_fails() {
+ let (schema_provider, _temp_dir) = create_test_schema_provider().await;
+
+ // Create empty MemTables
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
+ "id",
+ DataType::Int32,
+ false,
+ )]));
+
+ let empty_batch1 = RecordBatch::new_empty(arrow_schema.clone());
+ let empty_batch2 = RecordBatch::new_empty(arrow_schema.clone());
+ let mem_table1 = MemTable::try_new(arrow_schema.clone(),
vec![vec![empty_batch1]]).unwrap();
+ let mem_table2 = MemTable::try_new(arrow_schema,
vec![vec![empty_batch2]]).unwrap();
+
+ // Register first table - should succeed
+ let result1 = schema_provider.register_table("dup_table".to_string(),
Arc::new(mem_table1));
+ assert!(result1.is_ok());
+
+ // Register second table with same name - should fail
+ let result2 = schema_provider.register_table("dup_table".to_string(),
Arc::new(mem_table2));
+ assert!(result2.is_err());
+ let err = result2.unwrap_err();
+ assert!(
+ err.to_string().contains("already exists"),
+ "Expected error about table already existing, got: {err}",
+ );
}
}
diff --git a/crates/sqllogictest/src/engine/datafusion.rs
b/crates/sqllogictest/src/engine/datafusion.rs
index e9f93287d..487d8dc97 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -93,8 +93,7 @@ impl DataFusionEngine {
let namespace = NamespaceIdent::new("default".to_string());
catalog.create_namespace(&namespace, HashMap::new()).await?;
- // Create test tables
- Self::create_unpartitioned_table(&catalog, &namespace).await?;
+ // Create partitioned test table (unpartitioned tables are now created
via SQL)
Self::create_partitioned_table(&catalog, &namespace).await?;
Ok(Arc::new(
@@ -102,35 +101,9 @@ impl DataFusionEngine {
))
}
- /// Create an unpartitioned test table with id and name columns
- /// TODO: this can be removed when we support CREATE TABLE
- async fn create_unpartitioned_table(
- catalog: &impl Catalog,
- namespace: &NamespaceIdent,
- ) -> anyhow::Result<()> {
- let schema = Schema::builder()
- .with_fields(vec![
- NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
- ])
- .build()?;
-
- catalog
- .create_table(
- namespace,
- TableCreation::builder()
- .name("test_unpartitioned_table".to_string())
- .schema(schema)
- .build(),
- )
- .await?;
-
- Ok(())
- }
-
/// Create a partitioned test table with id, category, and value columns
/// Partitioned by category using identity transform
- /// TODO: this can be removed when we support CREATE TABLE
+ /// TODO: this can be removed when we support CREATE EXTERNAL TABLE
async fn create_partitioned_table(
catalog: &impl Catalog,
namespace: &NamespaceIdent,
diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml
b/crates/sqllogictest/testdata/schedules/df_test.toml
index df5e638d5..1d7f42c8d 100644
--- a/crates/sqllogictest/testdata/schedules/df_test.toml
+++ b/crates/sqllogictest/testdata/schedules/df_test.toml
@@ -22,6 +22,10 @@ df = { type = "datafusion" }
engine = "df"
slt = "df_test/show_tables.slt"
+[[steps]]
+engine = "df"
+slt = "df_test/create_table.slt"
+
[[steps]]
engine = "df"
slt = "df_test/insert_into.slt"
diff --git a/crates/sqllogictest/testdata/slts/df_test/create_table.slt
b/crates/sqllogictest/testdata/slts/df_test/create_table.slt
new file mode 100644
index 000000000..2eab1b6ba
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/create_table.slt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test CREATE TABLE with explicit schema
+statement ok
+CREATE TABLE default.default.empty_table (id INT NOT NULL, name STRING)
+
+# Verify the empty table exists and has correct schema
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+
+# Insert data into the created table
+query I
+INSERT INTO default.default.empty_table VALUES (1, 'Alice')
+----
+1
+
+# Verify the inserted data
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+1 Alice
+
+# Insert multiple rows
+query I
+INSERT INTO default.default.empty_table VALUES (2, 'Bob'), (3, 'Charlie')
+----
+2
+
+# Verify all rows
+query IT rowsort
+SELECT * FROM default.default.empty_table
+----
+1 Alice
+2 Bob
+3 Charlie
+
+# Test CREATE TABLE with different column types
+statement ok
+CREATE TABLE default.default.typed_table (id BIGINT NOT NULL, value DOUBLE,
flag BOOLEAN)
+
+# Verify the typed table exists
+query IDT rowsort
+SELECT * FROM default.default.typed_table
+----
+
+# Insert data with different types
+query I
+INSERT INTO default.default.typed_table VALUES (100, 3.14, true), (200, 2.71,
false)
+----
+2
+
+# Verify typed data
+query IDT rowsort
+SELECT * FROM default.default.typed_table
+----
+100 3.14 true
+200 2.71 false
+
+# Test CREATE TABLE with nullable columns
+statement ok
+CREATE TABLE default.default.nullable_table (id INT NOT NULL, optional_name
STRING)
+
+# Insert with NULL value
+query I
+INSERT INTO default.default.nullable_table VALUES (1, 'Value'), (2, NULL)
+----
+2
+
+# Verify NULL handling
+query IT rowsort
+SELECT * FROM default.default.nullable_table
+----
+1 Value
+2 NULL
diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
index 2ba33afcd..1e0784432 100644
--- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt
@@ -15,6 +15,10 @@
# specific language governing permissions and limitations
# under the License.
+# Create unpartitioned test table
+statement ok
+CREATE TABLE default.default.test_unpartitioned_table (id INT NOT NULL, name
STRING)
+
# Verify the table is initially empty
query IT rowsort
SELECT * FROM default.default.test_unpartitioned_table
diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
index c5da5f627..770072f9d 100644
--- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
+++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt
@@ -28,9 +28,6 @@ datafusion information_schema views VIEW
default default test_partitioned_table BASE TABLE
default default test_partitioned_table$manifests BASE TABLE
default default test_partitioned_table$snapshots BASE TABLE
-default default test_unpartitioned_table BASE TABLE
-default default test_unpartitioned_table$manifests BASE TABLE
-default default test_unpartitioned_table$snapshots BASE TABLE
default information_schema columns VIEW
default information_schema df_settings VIEW
default information_schema parameters VIEW