This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new bf984c754 feat(datafusion): Split IcebergTableProvider into static and 
non-static table provider (#1879)
bf984c754 is described below

commit bf984c754630cd4b536853c81528b9b69a3dab4d
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Nov 27 04:50:40 2025 -0800

    feat(datafusion): Split IcebergTableProvider into static and non-static 
table provider (#1879)
---
 bindings/python/Cargo.lock                         |   1 +
 bindings/python/src/datafusion_table_provider.rs   |   6 +-
 .../tests/shared_tests/datafusion.rs               |   4 +-
 crates/integrations/datafusion/src/schema.rs       |   6 +-
 crates/integrations/datafusion/src/table/mod.rs    | 468 +++++++++++++++++----
 .../datafusion/src/table/table_provider_factory.rs |   8 +-
 .../tests/integration_datafusion_test.rs           |  12 -
 7 files changed, 390 insertions(+), 115 deletions(-)

diff --git a/bindings/python/Cargo.lock b/bindings/python/Cargo.lock
index 8249414b8..814c9afb3 100644
--- a/bindings/python/Cargo.lock
+++ b/bindings/python/Cargo.lock
@@ -2313,6 +2313,7 @@ dependencies = [
  "chrono",
  "derive_builder",
  "expect-test",
+ "flate2",
  "fnv",
  "futures",
  "itertools 0.13.0",
diff --git a/bindings/python/src/datafusion_table_provider.rs 
b/bindings/python/src/datafusion_table_provider.rs
index b5e1bf952..8db7223b3 100644
--- a/bindings/python/src/datafusion_table_provider.rs
+++ b/bindings/python/src/datafusion_table_provider.rs
@@ -23,7 +23,7 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
 use iceberg::TableIdent;
 use iceberg::io::FileIO;
 use iceberg::table::StaticTable;
-use iceberg_datafusion::table::IcebergTableProvider;
+use iceberg_datafusion::table::IcebergStaticTableProvider;
 use pyo3::exceptions::PyRuntimeError;
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
@@ -32,7 +32,7 @@ use crate::runtime::runtime;
 
 #[pyclass(name = "IcebergDataFusionTable")]
 pub struct PyIcebergDataFusionTable {
-    inner: Arc<IcebergTableProvider>,
+    inner: Arc<IcebergStaticTableProvider>,
 }
 
 #[pymethods]
@@ -69,7 +69,7 @@ impl PyIcebergDataFusionTable {
 
             let table = static_table.into_table();
 
-            IcebergTableProvider::try_new_from_table(table)
+            IcebergStaticTableProvider::try_new_from_table(table)
                 .await
                 .map_err(|e| {
                     PyRuntimeError::new_err(format!("Failed to create table 
provider: {e}"))
diff --git a/crates/integration_tests/tests/shared_tests/datafusion.rs 
b/crates/integration_tests/tests/shared_tests/datafusion.rs
index 81bbb5f54..60dd9f36c 100644
--- a/crates/integration_tests/tests/shared_tests/datafusion.rs
+++ b/crates/integration_tests/tests/shared_tests/datafusion.rs
@@ -26,7 +26,7 @@ use datafusion::error::DataFusionError;
 use datafusion::prelude::SessionContext;
 use iceberg::{Catalog, CatalogBuilder, TableIdent};
 use iceberg_catalog_rest::RestCatalogBuilder;
-use iceberg_datafusion::IcebergTableProvider;
+use iceberg_datafusion::IcebergStaticTableProvider;
 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
 use crate::get_shared_containers;
@@ -47,7 +47,7 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
     let ctx = SessionContext::new();
 
     let table_provider = Arc::new(
-        IcebergTableProvider::try_new_from_table(table)
+        IcebergStaticTableProvider::try_new_from_table(table)
             .await
             .unwrap(),
     );
diff --git a/crates/integrations/datafusion/src/schema.rs 
b/crates/integrations/datafusion/src/schema.rs
index 3920ee73c..31bbdbd67 100644
--- a/crates/integrations/datafusion/src/schema.rs
+++ b/crates/integrations/datafusion/src/schema.rs
@@ -28,6 +28,7 @@ use iceberg::inspect::MetadataTableType;
 use iceberg::{Catalog, NamespaceIdent, Result};
 
 use crate::table::IcebergTableProvider;
+use crate::to_datafusion_error;
 
 /// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing
 /// access to table providers within a specific namespace.
@@ -113,7 +114,10 @@ impl SchemaProvider for IcebergSchemaProvider {
             let metadata_table_type =
                 
MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?;
             if let Some(table) = self.tables.get(table_name) {
-                let metadata_table = table.metadata_table(metadata_table_type);
+                let metadata_table = table
+                    .metadata_table(metadata_table_type)
+                    .await
+                    .map_err(to_datafusion_error)?;
                 return Ok(Some(Arc::new(metadata_table)));
             } else {
                 return Ok(None);
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 42a3baad3..8527668d6 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Iceberg table providers for DataFusion.
+//!
+//! This module provides two table provider implementations:
+//!
+//! - [`IcebergTableProvider`]: Catalog-backed provider with automatic 
metadata refresh.
+//!   Use for write operations and when you need to see the latest table state.
+//!
+//! - [`IcebergStaticTableProvider`]: Static provider for read-only access to 
a specific
+//!   table snapshot. Use for consistent analytical queries or time-travel 
scenarios.
+
 pub mod metadata_table;
 pub mod table_provider_factory;
 
@@ -38,98 +48,61 @@ use iceberg::table::Table;
 use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
 use metadata_table::IcebergMetadataTableProvider;
 
+use crate::error::to_datafusion_error;
 use crate::physical_plan::commit::IcebergCommitExec;
 use crate::physical_plan::project::project_with_partition;
 use crate::physical_plan::repartition::repartition;
 use crate::physical_plan::scan::IcebergTableScan;
 use crate::physical_plan::write::IcebergWriteExec;
 
-/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
-/// managing access to a [`Table`].
+/// Catalog-backed table provider with automatic metadata refresh.
+///
+/// This provider loads fresh table metadata from the catalog on every scan 
and write
+/// operation, ensuring you always see the latest table state. Use this when 
you need
+/// write operations or want to see the most up-to-date data.
+///
+/// For read-only access to a specific snapshot without catalog overhead, use
+/// [`IcebergStaticTableProvider`] instead.
 #[derive(Debug, Clone)]
 pub struct IcebergTableProvider {
-    /// A table in the catalog.
-    table: Table,
-    /// Table snapshot id that will be queried via this provider.
-    snapshot_id: Option<i64>,
-    /// A reference-counted arrow `Schema`.
+    /// The catalog that manages this table
+    catalog: Arc<dyn Catalog>,
+    /// The table identifier (namespace + name)
+    table_ident: TableIdent,
+    /// A reference-counted arrow `Schema` (cached at construction)
     schema: ArrowSchemaRef,
-    /// The catalog that the table belongs to.
-    catalog: Option<Arc<dyn Catalog>>,
 }
 
 impl IcebergTableProvider {
-    pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
-        IcebergTableProvider {
-            table,
-            snapshot_id: None,
-            schema,
-            catalog: None,
-        }
-    }
-    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
-    /// using the given client and table name to fetch an actual [`Table`]
-    /// in the provided namespace.
+    /// Creates a new catalog-backed table provider.
+    ///
+    /// Loads the table once to get the initial schema, then stores the catalog
+    /// reference for future metadata refreshes on each operation.
     pub(crate) async fn try_new(
-        client: Arc<dyn Catalog>,
+        catalog: Arc<dyn Catalog>,
         namespace: NamespaceIdent,
         name: impl Into<String>,
     ) -> Result<Self> {
-        let ident = TableIdent::new(namespace, name.into());
-        let table = client.load_table(&ident).await?;
+        let table_ident = TableIdent::new(namespace, name.into());
 
+        // Load table once to get initial schema
+        let table = catalog.load_table(&table_ident).await?;
         let schema = 
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
 
         Ok(IcebergTableProvider {
-            table,
-            snapshot_id: None,
-            schema,
-            catalog: Some(client),
-        })
-    }
-
-    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
-    /// using the given table. Can be used to create a table provider from an 
existing table regardless of the catalog implementation.
-    pub async fn try_new_from_table(table: Table) -> Result<Self> {
-        let schema = 
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
-        Ok(IcebergTableProvider {
-            table,
-            snapshot_id: None,
-            schema,
-            catalog: None,
-        })
-    }
-
-    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
-    /// using a specific snapshot of the given table. Can be used to create a 
table provider from an existing table regardless of the catalog implementation.
-    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) 
-> Result<Self> {
-        let snapshot = table
-            .metadata()
-            .snapshot_by_id(snapshot_id)
-            .ok_or_else(|| {
-                Error::new(
-                    ErrorKind::Unexpected,
-                    format!(
-                        "snapshot id {snapshot_id} not found in table {}",
-                        table.identifier().name()
-                    ),
-                )
-            })?;
-        let schema = snapshot.schema(table.metadata())?;
-        let schema = Arc::new(schema_to_arrow_schema(&schema)?);
-        Ok(IcebergTableProvider {
-            table,
-            snapshot_id: Some(snapshot_id),
+            catalog,
+            table_ident,
             schema,
-            catalog: None,
         })
     }
 
-    pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> 
IcebergMetadataTableProvider {
-        IcebergMetadataTableProvider {
-            table: self.table.clone(),
-            r#type,
-        }
+    pub(crate) async fn metadata_table(
+        &self,
+        r#type: MetadataTableType,
+    ) -> Result<IcebergMetadataTableProvider> {
+        // Load fresh table metadata for metadata table access
+        let table = self.catalog.load_table(&self.table_ident).await?;
+        Ok(IcebergMetadataTableProvider { table, r#type })
     }
 }
 
@@ -154,9 +127,17 @@ impl TableProvider for IcebergTableProvider {
         filters: &[Expr],
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        // Load fresh table metadata from catalog
+        let table = self
+            .catalog
+            .load_table(&self.table_ident)
+            .await
+            .map_err(to_datafusion_error)?;
+
+        // Create scan with fresh metadata (always use current snapshot)
         Ok(Arc::new(IcebergTableScan::new(
-            self.table.clone(),
-            self.snapshot_id,
+            table,
+            None, // Always use current snapshot for catalog-backed provider
             self.schema.clone(),
             projection,
             filters,
@@ -177,17 +158,18 @@ impl TableProvider for IcebergTableProvider {
         input: Arc<dyn ExecutionPlan>,
         _insert_op: InsertOp,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        let Some(catalog) = self.catalog.clone() else {
-            return Err(DataFusionError::Execution(
-                "Catalog cannot be none for insert_into".to_string(),
-            ));
-        };
+        // Load fresh table metadata from catalog
+        let table = self
+            .catalog
+            .load_table(&self.table_ident)
+            .await
+            .map_err(to_datafusion_error)?;
 
-        let partition_spec = self.table.metadata().default_partition_spec();
+        let partition_spec = table.metadata().default_partition_spec();
 
         // Step 1: Project partition values for partitioned tables
         let plan_with_partition = if !partition_spec.is_unpartitioned() {
-            project_with_partition(input, &self.table)?
+            project_with_partition(input, &table)?
         } else {
             input
         };
@@ -200,14 +182,11 @@ impl TableProvider for IcebergTableProvider {
                 )
             })?;
 
-        let repartitioned_plan = repartition(
-            plan_with_partition,
-            self.table.metadata_ref(),
-            target_partitions,
-        )?;
+        let repartitioned_plan =
+            repartition(plan_with_partition, table.metadata_ref(), 
target_partitions)?;
 
         let write_plan = Arc::new(IcebergWriteExec::new(
-            self.table.clone(),
+            table.clone(),
             repartitioned_plan,
             self.schema.clone(),
         ));
@@ -216,21 +195,139 @@ impl TableProvider for IcebergTableProvider {
         let coalesce_partitions = 
Arc::new(CoalescePartitionsExec::new(write_plan));
 
         Ok(Arc::new(IcebergCommitExec::new(
-            self.table.clone(),
-            catalog,
+            table,
+            self.catalog.clone(),
             coalesce_partitions,
             self.schema.clone(),
         )))
     }
 }
 
+/// Static table provider for read-only snapshot access.
+///
+/// This provider holds a cached table instance and does not refresh metadata 
or support
+/// write operations. Use this for consistent analytical queries, time-travel 
scenarios,
+/// or when you want to avoid catalog overhead.
+///
+/// For catalog-backed tables with write support and automatic refresh, use
+/// [`IcebergTableProvider`] instead.
+#[derive(Debug, Clone)]
+pub struct IcebergStaticTableProvider {
+    /// The static table instance (never refreshed)
+    table: Table,
+    /// Optional snapshot ID for this static view
+    snapshot_id: Option<i64>,
+    /// A reference-counted arrow `Schema`
+    schema: ArrowSchemaRef,
+}
+
+impl IcebergStaticTableProvider {
+    /// Creates a static provider from a table instance.
+    ///
+    /// Uses the table's current snapshot for all queries. Does not support 
write operations.
+    pub async fn try_new_from_table(table: Table) -> Result<Self> {
+        let schema = 
Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
+        Ok(IcebergStaticTableProvider {
+            table,
+            snapshot_id: None,
+            schema,
+        })
+    }
+
+    /// Creates a static provider for a specific table snapshot.
+    ///
+    /// Queries the specified snapshot for all operations. Useful for 
time-travel queries.
+    /// Does not support write operations.
+    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) 
-> Result<Self> {
+        let snapshot = table
+            .metadata()
+            .snapshot_by_id(snapshot_id)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "snapshot id {snapshot_id} not found in table {}",
+                        table.identifier().name()
+                    ),
+                )
+            })?;
+        let table_schema = snapshot.schema(table.metadata())?;
+        let schema = Arc::new(schema_to_arrow_schema(&table_schema)?);
+        Ok(IcebergStaticTableProvider {
+            table,
+            snapshot_id: Some(snapshot_id),
+            schema,
+        })
+    }
+}
+
+#[async_trait]
+impl TableProvider for IcebergStaticTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> ArrowSchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        // Use cached table (no refresh)
+        Ok(Arc::new(IcebergTableScan::new(
+            self.table.clone(),
+            self.snapshot_id,
+            self.schema.clone(),
+            projection,
+            filters,
+        )))
+    }
+
+    fn supports_filters_pushdown(
+        &self,
+        filters: &[&Expr],
+    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
+        // Push down all filters, as a single source of truth, the scanner 
will drop the filters which couldn't be push down
+        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
+    }
+
+    async fn insert_into(
+        &self,
+        _state: &dyn Session,
+        _input: Arc<dyn ExecutionPlan>,
+        _insert_op: InsertOp,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        Err(to_datafusion_error(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Write operations are not supported on IcebergStaticTableProvider. 
\
+             Use IcebergTableProvider with a catalog for write support."
+                .to_string(),
+        )))
+    }
+}
+
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
     use datafusion::common::Column;
     use datafusion::prelude::SessionContext;
-    use iceberg::TableIdent;
     use iceberg::io::FileIO;
+    use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
     use iceberg::table::{StaticTable, Table};
+    use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, 
TableIdent};
+    use tempfile::TempDir;
 
     use super::*;
 
@@ -253,10 +350,59 @@ mod tests {
         static_table.into_table()
     }
 
+    async fn get_test_catalog_and_table() -> (Arc<dyn Catalog>, 
NamespaceIdent, String, TempDir) {
+        let temp_dir = TempDir::new().unwrap();
+        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+        let catalog = MemoryCatalogBuilder::default()
+            .load(
+                "memory",
+                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), 
warehouse_path.clone())]),
+            )
+            .await
+            .unwrap();
+
+        let namespace = NamespaceIdent::new("test_ns".to_string());
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let table_creation = TableCreation::builder()
+            .name("test_table".to_string())
+            .location(format!("{}/test_table", warehouse_path))
+            .schema(schema)
+            .properties(HashMap::new())
+            .build();
+
+        catalog
+            .create_table(&namespace, table_creation)
+            .await
+            .unwrap();
+
+        (
+            Arc::new(catalog),
+            namespace,
+            "test_table".to_string(),
+            temp_dir,
+        )
+    }
+
+    // Tests for IcebergStaticTableProvider
+
     #[tokio::test]
-    async fn test_try_new_from_table() {
+    async fn test_static_provider_from_table() {
         let table = get_test_table_from_metadata_file().await;
-        let table_provider = 
IcebergTableProvider::try_new_from_table(table.clone())
+        let table_provider = 
IcebergStaticTableProvider::try_new_from_table(table.clone())
             .await
             .unwrap();
         let ctx = SessionContext::new();
@@ -278,11 +424,11 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_try_new_from_table_snapshot() {
+    async fn test_static_provider_from_snapshot() {
         let table = get_test_table_from_metadata_file().await;
         let snapshot_id = 
table.metadata().snapshots().next().unwrap().snapshot_id();
         let table_provider =
-            IcebergTableProvider::try_new_from_table_snapshot(table.clone(), 
snapshot_id)
+            
IcebergStaticTableProvider::try_new_from_table_snapshot(table.clone(), 
snapshot_id)
                 .await
                 .unwrap();
         let ctx = SessionContext::new();
@@ -304,16 +450,152 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_physical_input_schema_consistent_with_logical_input_schema() 
{
+    async fn test_static_provider_rejects_writes() {
+        let table = get_test_table_from_metadata_file().await;
+        let table_provider = 
IcebergStaticTableProvider::try_new_from_table(table.clone())
+            .await
+            .unwrap();
+        let ctx = SessionContext::new();
+        ctx.register_table("mytable", Arc::new(table_provider))
+            .unwrap();
+
+        // Attempt to insert into the static provider should fail
+        let result = ctx.sql("INSERT INTO mytable VALUES (1, 2, 3)").await;
+
+        // The error should occur during planning or execution
+        // We expect an error indicating write operations are not supported
+        assert!(
+            result.is_err() || {
+                let df = result.unwrap();
+                df.collect().await.is_err()
+            }
+        );
+    }
+
+    #[tokio::test]
+    async fn test_static_provider_scan() {
         let table = get_test_table_from_metadata_file().await;
-        let table_provider = 
IcebergTableProvider::try_new_from_table(table.clone())
+        let table_provider = 
IcebergStaticTableProvider::try_new_from_table(table.clone())
             .await
             .unwrap();
         let ctx = SessionContext::new();
         ctx.register_table("mytable", Arc::new(table_provider))
             .unwrap();
+
+        // Test that scan operations work correctly
         let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
         let physical_plan = df.create_physical_plan().await;
-        assert!(physical_plan.is_ok())
+        assert!(physical_plan.is_ok());
+    }
+
+    // Tests for IcebergTableProvider
+
+    #[tokio::test]
+    async fn test_catalog_backed_provider_creation() {
+        let (catalog, namespace, table_name, _temp_dir) = 
get_test_catalog_and_table().await;
+
+        // Test creating a catalog-backed provider
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        // Verify the schema is loaded correctly
+        let schema = provider.schema();
+        assert_eq!(schema.fields().len(), 2);
+        assert_eq!(schema.field(0).name(), "id");
+        assert_eq!(schema.field(1).name(), "name");
+    }
+
+    #[tokio::test]
+    async fn test_catalog_backed_provider_scan() {
+        let (catalog, namespace, table_name, _temp_dir) = 
get_test_catalog_and_table().await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        ctx.register_table("test_table", Arc::new(provider))
+            .unwrap();
+
+        // Test that scan operations work correctly
+        let df = ctx.sql("SELECT * FROM test_table").await.unwrap();
+
+        // Verify the schema in the query result
+        let df_schema = df.schema();
+        assert_eq!(df_schema.fields().len(), 2);
+        assert_eq!(df_schema.field(0).name(), "id");
+        assert_eq!(df_schema.field(1).name(), "name");
+
+        let physical_plan = df.create_physical_plan().await;
+        assert!(physical_plan.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_catalog_backed_provider_insert() {
+        let (catalog, namespace, table_name, _temp_dir) = 
get_test_catalog_and_table().await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        ctx.register_table("test_table", Arc::new(provider))
+            .unwrap();
+
+        // Test that insert operations work correctly
+        let result = ctx.sql("INSERT INTO test_table VALUES (1, 
'test')").await;
+
+        // Insert should succeed (or at least not fail during planning)
+        assert!(result.is_ok());
+
+        // Try to execute the insert plan
+        let df = result.unwrap();
+        let execution_result = df.collect().await;
+
+        // The execution should succeed
+        assert!(execution_result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_physical_input_schema_consistent_with_logical_input_schema() 
{
+        let (catalog, namespace, table_name, _temp_dir) = 
get_test_catalog_and_table().await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        ctx.register_table("test_table", Arc::new(provider))
+            .unwrap();
+
+        // Create a query plan
+        let df = ctx.sql("SELECT id, name FROM test_table").await.unwrap();
+
+        // Get logical schema before consuming df
+        let logical_schema = df.schema().clone();
+
+        // Get physical plan (this consumes df)
+        let physical_plan = df.create_physical_plan().await.unwrap();
+        let physical_schema = physical_plan.schema();
+
+        // Verify that logical and physical schemas are consistent
+        assert_eq!(
+            logical_schema.fields().len(),
+            physical_schema.fields().len()
+        );
+
+        for (logical_field, physical_field) in logical_schema
+            .fields()
+            .iter()
+            .zip(physical_schema.fields().iter())
+        {
+            assert_eq!(logical_field.name(), physical_field.name());
+            assert_eq!(logical_field.data_type(), physical_field.data_type());
+        }
     }
 }
diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs 
b/crates/integrations/datafusion/src/table/table_provider_factory.rs
index e8e87dd31..8c0c8e90d 100644
--- a/crates/integrations/datafusion/src/table/table_provider_factory.rs
+++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs
@@ -24,12 +24,11 @@ use datafusion::catalog::{Session, TableProvider, 
TableProviderFactory};
 use datafusion::error::Result as DFResult;
 use datafusion::logical_expr::CreateExternalTable;
 use datafusion::sql::TableReference;
-use iceberg::arrow::schema_to_arrow_schema;
 use iceberg::io::FileIO;
 use iceberg::table::StaticTable;
 use iceberg::{Error, ErrorKind, Result, TableIdent};
 
-use super::IcebergTableProvider;
+use super::IcebergStaticTableProvider;
 use crate::to_datafusion_error;
 
 /// A factory that implements DataFusion's `TableProviderFactory` to create 
`IcebergTableProvider` instances.
@@ -126,10 +125,11 @@ impl TableProviderFactory for IcebergTableProviderFactory 
{
             .map_err(to_datafusion_error)?
             .into_table();
 
-        let schema = schema_to_arrow_schema(table.metadata().current_schema())
+        let provider = IcebergStaticTableProvider::try_new_from_table(table)
+            .await
             .map_err(to_datafusion_error)?;
 
-        Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
+        Ok(Arc::new(provider))
     }
 }
 
diff --git 
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs 
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index fdf5b17d1..3ad84f383 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -492,10 +492,6 @@ async fn test_insert_into() -> Result<()> {
         .unwrap();
     assert_eq!(rows_inserted.value(0), 2);
 
-    // Refresh context to avoid getting stale table
-    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
-    ctx.register_catalog("catalog", catalog);
-
     // Query the table to verify the inserted data
     let df = ctx
         .sql("SELECT * FROM catalog.test_insert_into.my_table")
@@ -650,10 +646,6 @@ async fn test_insert_into_nested() -> Result<()> {
         .unwrap();
     assert_eq!(rows_inserted.value(0), 2);
 
-    // Refresh context to avoid getting stale table
-    let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
-    ctx.register_catalog("catalog", catalog);
-
     // Query the table to verify the inserted data
     let df = ctx
         .sql("SELECT * FROM catalog.test_insert_nested.nested_table ORDER BY 
id")
@@ -880,10 +872,6 @@ async fn test_insert_into_partitioned() -> Result<()> {
         .unwrap();
     assert_eq!(rows_inserted.value(0), 5);
 
-    // Refresh catalog to get updated table
-    let catalog = 
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
-    ctx.register_catalog("catalog", catalog);
-
     // Query the table to verify data
     let df = ctx
         .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table 
ORDER BY id")

Reply via email to