This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b8f088ee feat: implement IcebergTableProviderFactory for datafusion 
(#600)
b8f088ee is described below

commit b8f088eed6c79f4ec7e81b4b5f9df2aeb2663473
Author: yujie.zhang (he/him) <[email protected]>
AuthorDate: Fri Nov 1 21:04:41 2024 +0800

    feat: implement IcebergTableProviderFactory for datafusion (#600)
    
    * feat: implement IcebergTableProviderFactory for datafusion
    
    * fix comments
    
    * add doc&ut
    
    * remove print
    
    * fix comments
---
 crates/integrations/datafusion/src/lib.rs          |   1 +
 .../datafusion/src/{table.rs => table/mod.rs}      |   5 +
 .../datafusion/src/table/table_provider_factory.rs | 300 +++++++++++++++++++++
 .../testdata/table_metadata/TableMetadataV2.json   | 121 +++++++++
 4 files changed, 427 insertions(+)

diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index b64f8fb8..b7b927fd 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -24,4 +24,5 @@ pub use error::*;
 mod physical_plan;
 mod schema;
 mod table;
+pub use table::table_provider_factory::IcebergTableProviderFactory;
 pub use table::*;
diff --git a/crates/integrations/datafusion/src/table.rs 
b/crates/integrations/datafusion/src/table/mod.rs
similarity index 97%
rename from crates/integrations/datafusion/src/table.rs
rename to crates/integrations/datafusion/src/table/mod.rs
index bb24713a..82f29bb5 100644
--- a/crates/integrations/datafusion/src/table.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod table_provider_factory;
+
 use std::any::Any;
 use std::sync::Arc;
 
@@ -41,6 +43,9 @@ pub struct IcebergTableProvider {
 }
 
 impl IcebergTableProvider {
+    pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
+        IcebergTableProvider { table, schema }
+    }
     /// Asynchronously tries to construct a new [`IcebergTableProvider`]
     /// using the given client and table name to fetch an actual [`Table`]
     /// in the provided namespace.
diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs 
b/crates/integrations/datafusion/src/table/table_provider_factory.rs
new file mode 100644
index 00000000..b8e66bd3
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::CreateExternalTable;
+use datafusion::sql::TableReference;
+use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::io::FileIO;
+use iceberg::table::StaticTable;
+use iceberg::{Error, ErrorKind, Result, TableIdent};
+
+use super::IcebergTableProvider;
+use crate::to_datafusion_error;
+
+/// A factory that implements DataFusion's `TableProviderFactory` to create 
`IcebergTableProvider` instances.
+///
+/// # Example
+///
+/// The following example demonstrates how to create an Iceberg external table 
using SQL in
+/// a DataFusion session with `IcebergTableProviderFactory`:
+///
+/// ```
+/// use std::sync::Arc;
+///
+/// use datafusion::execution::session_state::SessionStateBuilder;
+/// use datafusion::prelude::*;
+/// use datafusion::sql::TableReference;
+/// use iceberg_datafusion::IcebergTableProviderFactory;
+///
+/// #[tokio::main]
+/// async fn main() {
+///     // Create a new session context
+///     let mut state = 
SessionStateBuilder::new().with_default_features().build();
+///
+///     // Register the IcebergTableProviderFactory in the session
+///     state.table_factories_mut().insert(
+///         "ICEBERG".to_string(),
+///         Arc::new(IcebergTableProviderFactory::new()),
+///     );
+///
+///     let ctx = SessionContext::new_with_state(state);
+///
+///     // Define the table reference and the location of the Iceberg metadata 
file
+///     let table_ref = TableReference::bare("my_iceberg_table");
+///     // /path/to/iceberg/metadata
+///     let metadata_file_path = format!(
+///         "{}/testdata/table_metadata/{}",
+///         env!("CARGO_MANIFEST_DIR"),
+///         "TableMetadataV2.json"
+///     );
+///
+///     // SQL command to create the Iceberg external table
+///     let sql = format!(
+///         "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
+///         table_ref, metadata_file_path
+///     );
+///
+///     // Execute the SQL to create the external table
+///     ctx.sql(&sql).await.expect("Failed to create table");
+///
+///     // Verify the table was created by retrieving the table provider
+///     let table_provider = ctx
+///         .table_provider(table_ref)
+///         .await
+///         .expect("Table not found");
+///
+///     println!("Iceberg external table created successfully.");
+/// }
+/// ```
+///
+/// # Note
+/// This factory is designed to work with the DataFusion query engine,
+/// specifically for handling Iceberg tables in external table commands.
+/// Currently, this implementation supports only reading Iceberg tables, with
+/// the creation of new tables not yet available.
+///
+/// # Errors
+/// An error will be returned if any unsupported feature, such as partition 
columns,
+/// order expressions, constraints, or column defaults, is detected in the 
table creation command.
+#[derive(Default)]
+pub struct IcebergTableProviderFactory {}
+
+impl IcebergTableProviderFactory {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+#[async_trait]
+impl TableProviderFactory for IcebergTableProviderFactory {
+    async fn create(
+        &self,
+        _state: &dyn Session,
+        cmd: &CreateExternalTable,
+    ) -> DFResult<Arc<dyn TableProvider>> {
+        check_cmd(cmd).map_err(to_datafusion_error)?;
+
+        let table_name = &cmd.name;
+        let metadata_file_path = &cmd.location;
+        let options = &cmd.options;
+
+        let table_name_with_ns = complement_namespace_if_necessary(table_name);
+
+        let table = create_static_table(table_name_with_ns, 
metadata_file_path, options)
+            .await
+            .map_err(to_datafusion_error)?
+            .into_table();
+
+        let schema = schema_to_arrow_schema(table.metadata().current_schema())
+            .map_err(to_datafusion_error)?;
+
+        Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
+    }
+}
+
+fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
+    let CreateExternalTable {
+        schema,
+        table_partition_cols,
+        order_exprs,
+        constraints,
+        column_defaults,
+        ..
+    } = cmd;
+
+    // Check if any of the fields violate the constraints in a single condition
+    let is_invalid = !schema.fields().is_empty()
+        || !table_partition_cols.is_empty()
+        || !order_exprs.is_empty()
+        || !constraints.is_empty()
+        || !column_defaults.is_empty();
+
+    if is_invalid {
+        return Err(Error::new(ErrorKind::FeatureUnsupported, "Currently we 
only support reading existing icebergs tables in external table command. To 
create new table, please use catalog provider."));
+    }
+
+    Ok(())
+}
+
+/// Complements the namespace of a table name if necessary.
+///
+/// # Note
+/// If the table name is a bare name, it will be complemented with the 
'default' namespace.
+/// Otherwise, it will be returned as is. Because Iceberg tables are always 
namespaced, but DataFusion
+/// external table commands maybe not include the namespace, this function 
ensures that the namespace is always present.
+///
+/// # See also
+/// - [`iceberg::NamespaceIdent`]
+/// - [`datafusion::sql::planner::SqlToRel::external_table_to_plan`]
+fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_, 
TableReference> {
+    match table_name {
+        TableReference::Bare { table } => {
+            Cow::Owned(TableReference::partial("default", table.as_ref()))
+        }
+        other => Cow::Borrowed(other),
+    }
+}
+
+async fn create_static_table(
+    table_name: Cow<'_, TableReference>,
+    metadata_file_path: &str,
+    props: &HashMap<String, String>,
+) -> Result<StaticTable> {
+    let table_ident = TableIdent::from_strs(table_name.to_vec())?;
+    let file_io = FileIO::from_path(metadata_file_path)?
+        .with_props(props)
+        .build()?;
+    StaticTable::from_metadata_file(metadata_file_path, table_ident, 
file_io).await
+}
+
+#[cfg(test)]
+mod tests {
+
+    use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::catalog::TableProviderFactory;
+    use datafusion::common::{Constraints, DFSchema};
+    use datafusion::execution::session_state::SessionStateBuilder;
+    use datafusion::logical_expr::CreateExternalTable;
+    use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+    use datafusion::prelude::SessionContext;
+    use datafusion::sql::TableReference;
+
+    use super::*;
+
+    fn table_metadata_v2_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("x", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("y", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+            Field::new("z", DataType::Int64, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "3".to_string(),
+            )])),
+        ])
+    }
+
+    fn table_metadata_location() -> String {
+        format!(
+            "{}/testdata/table_metadata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            "TableMetadataV2.json"
+        )
+    }
+
+    fn create_external_table_cmd() -> CreateExternalTable {
+        let metadata_file_path = table_metadata_location();
+
+        CreateExternalTable {
+            name: TableReference::partial("static_ns", "static_table"),
+            location: metadata_file_path,
+            schema: Arc::new(DFSchema::empty()),
+            file_type: "iceberg".to_string(),
+            options: Default::default(),
+            table_partition_cols: Default::default(),
+            order_exprs: Default::default(),
+            constraints: Constraints::empty(),
+            column_defaults: Default::default(),
+            if_not_exists: Default::default(),
+            definition: Default::default(),
+            unbounded: Default::default(),
+        }
+    }
+
+    #[tokio::test]
+    async fn test_schema_of_created_table() {
+        let factory = IcebergTableProviderFactory::new();
+
+        let state = SessionStateBuilder::new().build();
+        let cmd = create_external_table_cmd();
+
+        let table_provider = factory
+            .create(&state, &cmd)
+            .await
+            .expect("create table failed");
+
+        let expected_schema = table_metadata_v2_schema();
+        let actual_schema = table_provider.schema();
+
+        assert_eq!(actual_schema.as_ref(), &expected_schema);
+    }
+
+    #[tokio::test]
+    async fn test_schema_of_created_external_table_sql() {
+        let mut state = 
SessionStateBuilder::new().with_default_features().build();
+        state.table_factories_mut().insert(
+            "ICEBERG".to_string(),
+            Arc::new(IcebergTableProviderFactory::new()),
+        );
+        let ctx = SessionContext::new_with_state(state);
+
+        // All external tables in DataFusion use bare names.
+        // See 
https://github.com/apache/datafusion/blob/main/datafusion/sql/src/statement.rs#L1038-#L1039
+        let table_ref = TableReference::bare("static_table");
+
+        // Create the external table
+        let sql = format!(
+            "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
+            table_ref,
+            table_metadata_location()
+        );
+        let _df = ctx.sql(&sql).await.expect("create table failed");
+
+        // Get the created external table
+        let table_provider = ctx
+            .table_provider(table_ref)
+            .await
+            .expect("table not found");
+
+        // Check the schema of the created table
+        let expected_schema = table_metadata_v2_schema();
+        let actual_schema = table_provider.schema();
+
+        assert_eq!(actual_schema.as_ref(), &expected_schema);
+    }
+}
diff --git 
a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json 
b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json
new file mode 100644
index 00000000..a7b47217
--- /dev/null
+++ 
b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json
@@ -0,0 +1,121 @@
+{
+  "format-version": 2,
+  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+  "location": "s3://bucket/test/location",
+  "last-sequence-number": 34,
+  "last-updated-ms": 1602638573590,
+  "last-column-id": 3,
+  "current-schema-id": 1,
+  "schemas": [
+    {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long"
+        }
+      ]
+    },
+    {
+      "type": "struct",
+      "schema-id": 1,
+      "identifier-field-ids": [
+        1,
+        2
+      ],
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long"
+        },
+        {
+          "id": 2,
+          "name": "y",
+          "required": true,
+          "type": "long"
+        },
+        {
+          "id": 3,
+          "name": "z",
+          "required": true,
+          "type": "long"
+        }
+      ]
+    }
+  ],
+  "default-spec-id": 0,
+  "partition-specs": [
+    {
+      "spec-id": 0,
+      "fields": [
+        {
+          "name": "x",
+          "transform": "identity",
+          "source-id": 1,
+          "field-id": 1000
+        }
+      ]
+    }
+  ],
+  "last-partition-id": 1000,
+  "default-sort-order-id": 3,
+  "sort-orders": [
+    {
+      "order-id": 3,
+      "fields": [
+        {
+          "transform": "identity",
+          "source-id": 2,
+          "direction": "asc",
+          "null-order": "nulls-first"
+        },
+        {
+          "transform": "bucket[4]",
+          "source-id": 3,
+          "direction": "desc",
+          "null-order": "nulls-last"
+        }
+      ]
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": 3055729675574597004,
+  "snapshots": [
+    {
+      "snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1515100955770,
+      "sequence-number": 0,
+      "summary": {
+        "operation": "append"
+      },
+      "manifest-list": "s3://a/b/1.avro"
+    },
+    {
+      "snapshot-id": 3055729675574597004,
+      "parent-snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1555100955770,
+      "sequence-number": 1,
+      "summary": {
+        "operation": "append"
+      },
+      "manifest-list": "s3://a/b/2.avro",
+      "schema-id": 1
+    }
+  ],
+  "snapshot-log": [
+    {
+      "snapshot-id": 3051729675574597004,
+      "timestamp-ms": 1515100955770
+    },
+    {
+      "snapshot-id": 3055729675574597004,
+      "timestamp-ms": 1555100955770
+    }
+  ],
+  "metadata-log": []
+}
\ No newline at end of file

Reply via email to