liurenjie1024 commented on code in PR #2029: URL: https://github.com/apache/iceberg-rust/pull/2029#discussion_r2696436619
########## website/src/datafusion.md: ########## @@ -0,0 +1,218 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. +--> + +# DataFusion Integration + +The `iceberg-datafusion` crate provides integration between Apache Iceberg and [DataFusion](https://datafusion.apache.org/), enabling SQL queries on Iceberg tables. + +## Features + +- **SQL DDL/DML**: `CREATE TABLE`, `INSERT INTO`, `SELECT` +- **Query Optimization**: Projection, filter, and LIMIT pushdown +- **Metadata Tables**: Query snapshots and manifests +- **Partitioned Tables**: Automatic partition routing for writes + +## Dependencies + +Add the following to your `Cargo.toml`: + +```toml +[dependencies] +iceberg = "0.8" +iceberg-datafusion = "0.8" +datafusion = "51" +tokio = { version = "1", features = ["full"] } +``` + +## Catalog-Based Access + +The recommended way to use Iceberg with DataFusion is through `IcebergCatalogProvider`, which integrates an Iceberg catalog with DataFusion's catalog system. + +### Setup + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:catalog_setup}} +``` + +### Creating Tables + +Once the catalog is registered, you can create tables using SQL: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:create_table}} +``` + +Supported column types include: +- `INT`, `BIGINT` - Integer types +- `FLOAT`, `DOUBLE` - Floating point types +- `STRING` - String/text type +- `BOOLEAN` - Boolean type +- `DATE`, `TIMESTAMP` - Date/time types + +> **Note**: `CREATE TABLE AS SELECT` is not currently supported. Create the table first, then use `INSERT INTO`. + +### Inserting Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:insert_data}} +``` + +For nested structs, use the `named_struct()` function: + +```sql +INSERT INTO catalog.namespace.table +SELECT + 1 as id, + named_struct('street', '123 Main St', 'city', 'NYC') as address +``` + +### Querying Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:query_data}} +``` + +## Metadata Tables + +Iceberg metadata tables can be queried using the `$` syntax (following Flink convention): + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:metadata_tables}} +``` + +Available metadata tables: +- `table$snapshots` - Table snapshot history +- `table$manifests` - Manifest file information + +## File-Based Access (External Tables) + +For reading existing Iceberg tables without a catalog, use `IcebergTableProviderFactory`: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:external_table_setup}} +``` + +Then create external tables via SQL: + +```sql +CREATE EXTERNAL TABLE my_table +STORED AS ICEBERG +LOCATION '/path/to/iceberg/metadata/v1.metadata.json'; + +SELECT * FROM my_table; +``` + +> **Note**: External tables are read-only. For write operations, use `IcebergCatalogProvider`. + +## Table Provider Types + +### IcebergTableProvider + +- Backed by an Iceberg catalog +- Automatically refreshes metadata on each operation +- Supports both read and write operations +- Use when you need the latest table state or write capability + +### IcebergStaticTableProvider + +- Fixed table snapshot at construction time +- No catalog round-trips (better performance) +- Read-only +- Use for time-travel queries or when consistency within a query is important + +## Partitioned Tables + +### Creating Partitioned Tables + +Partitioned tables must be created using the Iceberg catalog API (not SQL): + +```rust,no_run +use iceberg::spec::{Transform, UnboundPartitionSpec}; + +let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(column_id, "partition_column", Transform::Identity)? + .build(); +``` + +Supported partition transforms: +- `Identity` - Partition by exact value +- `Year`, `Month`, `Day`, `Hour` - Time-based partitioning +- `Bucket(n)` - Hash partitioning into n buckets +- `Truncate(width)` - String/number truncation + +### Writing to Partitioned Tables + +When inserting into a partitioned table, data is automatically routed to the correct partition directories: + +```sql +INSERT INTO catalog.namespace.partitioned_table VALUES + (1, 'electronics', 'laptop'), + (2, 'books', 'novel'); +-- Data files will be created under: +-- data/category=electronics/ +-- data/category=books/ +``` + +### Write Modes Review Comment: Seems we don't have a good way to update table property? ########## website/src/datafusion.md: ########## @@ -0,0 +1,218 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. +--> + +# DataFusion Integration + +The `iceberg-datafusion` crate provides integration between Apache Iceberg and [DataFusion](https://datafusion.apache.org/), enabling SQL queries on Iceberg tables. + +## Features + +- **SQL DDL/DML**: `CREATE TABLE`, `INSERT INTO`, `SELECT` +- **Query Optimization**: Projection, filter, and LIMIT pushdown +- **Metadata Tables**: Query snapshots and manifests +- **Partitioned Tables**: Automatic partition routing for writes + +## Dependencies + +Add the following to your `Cargo.toml`: + +```toml +[dependencies] +iceberg = "0.8" +iceberg-datafusion = "0.8" +datafusion = "51" +tokio = { version = "1", features = ["full"] } +``` + +## Catalog-Based Access + +The recommended way to use Iceberg with DataFusion is through `IcebergCatalogProvider`, which integrates an Iceberg catalog with DataFusion's catalog system. + +### Setup + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:catalog_setup}} +``` + +### Creating Tables + +Once the catalog is registered, you can create tables using SQL: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:create_table}} +``` + +Supported column types include: +- `INT`, `BIGINT` - Integer types +- `FLOAT`, `DOUBLE` - Floating point types +- `STRING` - String/text type +- `BOOLEAN` - Boolean type +- `DATE`, `TIMESTAMP` - Date/time types + +> **Note**: `CREATE TABLE AS SELECT` is not currently supported. Create the table first, then use `INSERT INTO`. + +### Inserting Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:insert_data}} +``` + +For nested structs, use the `named_struct()` function: + +```sql +INSERT INTO catalog.namespace.table +SELECT + 1 as id, + named_struct('street', '123 Main St', 'city', 'NYC') as address +``` + +### Querying Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:query_data}} +``` + +## Metadata Tables + +Iceberg metadata tables can be queried using the `$` syntax (following Flink convention): + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:metadata_tables}} +``` + +Available metadata tables: +- `table$snapshots` - Table snapshot history +- `table$manifests` - Manifest file information + +## File-Based Access (External Tables) + +For reading existing Iceberg tables without a catalog, use `IcebergTableProviderFactory`: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:external_table_setup}} +``` + +Then create external tables via SQL: + +```sql +CREATE EXTERNAL TABLE my_table +STORED AS ICEBERG +LOCATION '/path/to/iceberg/metadata/v1.metadata.json'; + +SELECT * FROM my_table; +``` + +> **Note**: External tables are read-only. For write operations, use `IcebergCatalogProvider`. + +## Table Provider Types + +### IcebergTableProvider + +- Backed by an Iceberg catalog +- Automatically refreshes metadata on each operation +- Supports both read and write operations +- Use when you need the latest table state or write capability + +### IcebergStaticTableProvider + +- Fixed table snapshot at construction time +- No catalog round-trips (better performance) +- Read-only +- Use for time-travel queries or when consistency within a query is important + +## Partitioned Tables + +### Creating Partitioned Tables + +Partitioned tables must be created using the Iceberg catalog API (not SQL): + +```rust,no_run +use iceberg::spec::{Transform, UnboundPartitionSpec}; + +let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(column_id, "partition_column", Transform::Identity)? + .build(); +``` + +Supported partition transforms: +- `Identity` - Partition by exact value +- `Year`, `Month`, `Day`, `Hour` - Time-based partitioning +- `Bucket(n)` - Hash partitioning into n buckets +- `Truncate(width)` - String/number truncation + +### Writing to Partitioned Tables + +When inserting into a partitioned table, data is automatically routed to the correct partition directories: + +```sql +INSERT INTO catalog.namespace.partitioned_table VALUES + (1, 'electronics', 'laptop'), + (2, 'books', 'novel'); +-- Data files will be created under: +-- data/category=electronics/ +-- data/category=books/ +``` + +### Write Modes + +Two write modes are available for partitioned tables: + +| Mode | Property Value | Description | +|------|---------------|-------------| +| **Fanout** (default) | `true` | Handles unsorted data, maintains open writers for all partitions | +| **Clustered** | `false` | Requires sorted input, more memory efficient | + +Configure via table property: +``` +write.datafusion.fanout.enabled = true +``` + +## Query Optimization Review Comment: I perfer to remove this part. This is for developers, not users. ########## crates/examples/src/datafusion_integration.rs: ########## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating DataFusion integration with Apache Iceberg. +//! +//! This example shows how to: +//! - Set up an Iceberg catalog with DataFusion +//! - Create tables using SQL +//! - Insert and query data +//! - Query metadata tables + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::execution::session_state::SessionStateBuilder; +use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory}; +use tempfile::TempDir; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + // Create a temporary directory for the warehouse + let temp_dir = TempDir::new()?; + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + // ANCHOR: catalog_setup + // Create an in-memory Iceberg catalog + let iceberg_catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path)]), + ) + .await?; + + // Create a namespace for our tables + let namespace = NamespaceIdent::new("demo".to_string()); + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + // Create the IcebergCatalogProvider and register it with DataFusion + let catalog_provider = + Arc::new(IcebergCatalogProvider::try_new(Arc::new(iceberg_catalog)).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("iceberg", catalog_provider); + // ANCHOR_END: catalog_setup + + // ANCHOR: create_table + // Create a table using SQL + ctx.sql( + "CREATE TABLE iceberg.demo.users ( + id INT NOT NULL, + name STRING NOT NULL, + email STRING + )", + ) + .await?; + + println!("Table 'users' created successfully."); + // ANCHOR_END: create_table + + // ANCHOR: insert_data + // Insert data into the table + let result = ctx + .sql( + "INSERT INTO iceberg.demo.users VALUES + (1, 'Alice', '[email protected]'), + (2, 'Bob', '[email protected]'), + (3, 'Charlie', NULL)", + ) + .await? + .collect() + .await?; + + // The result contains the number of rows inserted + println!("Inserted {} rows.", result[0].num_rows()); + // ANCHOR_END: insert_data + + // ANCHOR: query_data + // Query the data with filtering + println!("\nQuerying users with email:"); + let df = ctx + .sql("SELECT id, name, email FROM iceberg.demo.users WHERE email IS NOT NULL") + .await?; + + df.show().await?; + + // Query with projection (only specific columns) + println!("\nQuerying only names:"); + let df = ctx + .sql("SELECT name FROM iceberg.demo.users ORDER BY id") + .await?; + + df.show().await?; + // ANCHOR_END: query_data + + // ANCHOR: metadata_tables + // Query the snapshots metadata table + println!("\nTable snapshots:"); + let df = ctx + .sql("SELECT snapshot_id, operation FROM iceberg.demo.users$snapshots") + .await?; + + df.show().await?; + + // Query the manifests metadata table + println!("\nTable manifests:"); + let df = ctx + .sql("SELECT path, added_data_files_count FROM iceberg.demo.users$manifests") + .await?; + + df.show().await?; + // ANCHOR_END: metadata_tables + + println!("\nDataFusion integration example completed successfully!"); + + Ok(()) +} + +// ANCHOR: external_table_setup +/// Example of setting up IcebergTableProviderFactory for external tables. +/// +/// This allows reading existing Iceberg tables via `CREATE EXTERNAL TABLE` syntax. +#[allow(dead_code)] +async fn setup_external_table_support() -> SessionContext { Review Comment: I prefer to move external table to another module. ########## crates/examples/src/datafusion_integration.rs: ########## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating DataFusion integration with Apache Iceberg. +//! +//! This example shows how to: +//! - Set up an Iceberg catalog with DataFusion +//! - Create tables using SQL +//! - Insert and query data +//! - Query metadata tables + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::execution::session_state::SessionStateBuilder; +use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent}; +use iceberg_datafusion::{IcebergCatalogProvider, IcebergTableProviderFactory}; +use tempfile::TempDir; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + // Create a temporary directory for the warehouse + let temp_dir = TempDir::new()?; + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + // ANCHOR: catalog_setup + // Create an in-memory Iceberg catalog + let iceberg_catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path)]), + ) + .await?; + + // Create a namespace for our tables + let namespace = NamespaceIdent::new("demo".to_string()); + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + // Create the IcebergCatalogProvider and register it with DataFusion + let catalog_provider = + Arc::new(IcebergCatalogProvider::try_new(Arc::new(iceberg_catalog)).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("iceberg", catalog_provider); Review Comment: ```suggestion ctx.register_catalog("my_catalog", catalog_provider); ``` A non-special name to tell user that this name has no special meaning. ########## website/src/datafusion.md: ########## @@ -0,0 +1,218 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. +--> + +# DataFusion Integration + +The `iceberg-datafusion` crate provides integration between Apache Iceberg and [DataFusion](https://datafusion.apache.org/), enabling SQL queries on Iceberg tables. + +## Features + +- **SQL DDL/DML**: `CREATE TABLE`, `INSERT INTO`, `SELECT` +- **Query Optimization**: Projection, filter, and LIMIT pushdown +- **Metadata Tables**: Query snapshots and manifests +- **Partitioned Tables**: Automatic partition routing for writes + +## Dependencies + +Add the following to your `Cargo.toml`: + +```toml +[dependencies] +iceberg = "0.8" +iceberg-datafusion = "0.8" +datafusion = "51" +tokio = { version = "1", features = ["full"] } +``` + +## Catalog-Based Access + +The recommended way to use Iceberg with DataFusion is through `IcebergCatalogProvider`, which integrates an Iceberg catalog with DataFusion's catalog system. + +### Setup + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:catalog_setup}} +``` + +### Creating Tables + +Once the catalog is registered, you can create tables using SQL: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:create_table}} +``` + +Supported column types include: +- `INT`, `BIGINT` - Integer types +- `FLOAT`, `DOUBLE` - Floating point types +- `STRING` - String/text type +- `BOOLEAN` - Boolean type +- `DATE`, `TIMESTAMP` - Date/time types + +> **Note**: `CREATE TABLE AS SELECT` is not currently supported. Create the table first, then use `INSERT INTO`. + +### Inserting Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:insert_data}} +``` + +For nested structs, use the `named_struct()` function: + +```sql +INSERT INTO catalog.namespace.table +SELECT + 1 as id, + named_struct('street', '123 Main St', 'city', 'NYC') as address +``` + +### Querying Data + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:query_data}} +``` + +## Metadata Tables + +Iceberg metadata tables can be queried using the `$` syntax (following Flink convention): + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:metadata_tables}} +``` + +Available metadata tables: +- `table$snapshots` - Table snapshot history +- `table$manifests` - Manifest file information + +## File-Based Access (External Tables) + +For reading existing Iceberg tables without a catalog, use `IcebergTableProviderFactory`: + +```rust,no_run,noplayground +{{#rustdoc_include ../../crates/examples/src/datafusion_integration.rs:external_table_setup}} +``` + +Then create external tables via SQL: + +```sql +CREATE EXTERNAL TABLE my_table +STORED AS ICEBERG +LOCATION '/path/to/iceberg/metadata/v1.metadata.json'; + +SELECT * FROM my_table; +``` + +> **Note**: External tables are read-only. For write operations, use `IcebergCatalogProvider`. + +## Table Provider Types + +### IcebergTableProvider + +- Backed by an Iceberg catalog +- Automatically refreshes metadata on each operation +- Supports both read and write operations +- Use when you need the latest table state or write capability + +### IcebergStaticTableProvider + +- Fixed table snapshot at construction time +- No catalog round-trips (better performance) +- Read-only +- Use for time-travel queries or when consistency within a query is important + +## Partitioned Tables Review Comment: I prefer not to include this part. Datafusion integration is primarily for datafusion user who wants a high level api. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
