This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b8f088ee feat: implement IcebergTableProviderFactory for datafusion
(#600)
b8f088ee is described below
commit b8f088eed6c79f4ec7e81b4b5f9df2aeb2663473
Author: yujie.zhang (he/him) <[email protected]>
AuthorDate: Fri Nov 1 21:04:41 2024 +0800
feat: implement IcebergTableProviderFactory for datafusion (#600)
* feat: implement IcebergTableProviderFactory for datafusion
* fix comments
* add doc&ut
* remove print
* fix comments
---
crates/integrations/datafusion/src/lib.rs | 1 +
.../datafusion/src/{table.rs => table/mod.rs} | 5 +
.../datafusion/src/table/table_provider_factory.rs | 300 +++++++++++++++++++++
.../testdata/table_metadata/TableMetadataV2.json | 121 +++++++++
4 files changed, 427 insertions(+)
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index b64f8fb8..b7b927fd 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -24,4 +24,5 @@ pub use error::*;
mod physical_plan;
mod schema;
mod table;
+pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;
diff --git a/crates/integrations/datafusion/src/table.rs
b/crates/integrations/datafusion/src/table/mod.rs
similarity index 97%
rename from crates/integrations/datafusion/src/table.rs
rename to crates/integrations/datafusion/src/table/mod.rs
index bb24713a..82f29bb5 100644
--- a/crates/integrations/datafusion/src/table.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+pub mod table_provider_factory;
+
use std::any::Any;
use std::sync::Arc;
@@ -41,6 +43,9 @@ pub struct IcebergTableProvider {
}
impl IcebergTableProvider {
+ pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
+ IcebergTableProvider { table, schema }
+ }
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
/// in the provided namespace.
diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs
b/crates/integrations/datafusion/src/table/table_provider_factory.rs
new file mode 100644
index 00000000..b8e66bd3
--- /dev/null
+++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::CreateExternalTable;
+use datafusion::sql::TableReference;
+use iceberg::arrow::schema_to_arrow_schema;
+use iceberg::io::FileIO;
+use iceberg::table::StaticTable;
+use iceberg::{Error, ErrorKind, Result, TableIdent};
+
+use super::IcebergTableProvider;
+use crate::to_datafusion_error;
+
+/// A factory that implements DataFusion's `TableProviderFactory` to create
`IcebergTableProvider` instances.
+///
+/// # Example
+///
+/// The following example demonstrates how to create an Iceberg external table
using SQL in
+/// a DataFusion session with `IcebergTableProviderFactory`:
+///
+/// ```
+/// use std::sync::Arc;
+///
+/// use datafusion::execution::session_state::SessionStateBuilder;
+/// use datafusion::prelude::*;
+/// use datafusion::sql::TableReference;
+/// use iceberg_datafusion::IcebergTableProviderFactory;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// // Create a new session context
+/// let mut state =
SessionStateBuilder::new().with_default_features().build();
+///
+/// // Register the IcebergTableProviderFactory in the session
+/// state.table_factories_mut().insert(
+/// "ICEBERG".to_string(),
+/// Arc::new(IcebergTableProviderFactory::new()),
+/// );
+///
+/// let ctx = SessionContext::new_with_state(state);
+///
+/// // Define the table reference and the location of the Iceberg metadata
file
+/// let table_ref = TableReference::bare("my_iceberg_table");
+/// // /path/to/iceberg/metadata
+/// let metadata_file_path = format!(
+/// "{}/testdata/table_metadata/{}",
+/// env!("CARGO_MANIFEST_DIR"),
+/// "TableMetadataV2.json"
+/// );
+///
+/// // SQL command to create the Iceberg external table
+/// let sql = format!(
+/// "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
+/// table_ref, metadata_file_path
+/// );
+///
+/// // Execute the SQL to create the external table
+/// ctx.sql(&sql).await.expect("Failed to create table");
+///
+/// // Verify the table was created by retrieving the table provider
+/// let table_provider = ctx
+/// .table_provider(table_ref)
+/// .await
+/// .expect("Table not found");
+///
+/// println!("Iceberg external table created successfully.");
+/// }
+/// ```
+///
+/// # Note
+/// This factory is designed to work with the DataFusion query engine,
+/// specifically for handling Iceberg tables in external table commands.
+/// Currently, this implementation supports only reading Iceberg tables, with
+/// the creation of new tables not yet available.
+///
+/// # Errors
+/// An error will be returned if any unsupported feature, such as partition
columns,
+/// order expressions, constraints, or column defaults, is detected in the
table creation command.
+#[derive(Default)]
+pub struct IcebergTableProviderFactory {}
+
+impl IcebergTableProviderFactory {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl TableProviderFactory for IcebergTableProviderFactory {
+ async fn create(
+ &self,
+ _state: &dyn Session,
+ cmd: &CreateExternalTable,
+ ) -> DFResult<Arc<dyn TableProvider>> {
+ check_cmd(cmd).map_err(to_datafusion_error)?;
+
+ let table_name = &cmd.name;
+ let metadata_file_path = &cmd.location;
+ let options = &cmd.options;
+
+ let table_name_with_ns = complement_namespace_if_necessary(table_name);
+
+ let table = create_static_table(table_name_with_ns,
metadata_file_path, options)
+ .await
+ .map_err(to_datafusion_error)?
+ .into_table();
+
+ let schema = schema_to_arrow_schema(table.metadata().current_schema())
+ .map_err(to_datafusion_error)?;
+
+ Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
+ }
+}
+
+fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
+ let CreateExternalTable {
+ schema,
+ table_partition_cols,
+ order_exprs,
+ constraints,
+ column_defaults,
+ ..
+ } = cmd;
+
+ // Check if any of the fields violate the constraints in a single condition
+ let is_invalid = !schema.fields().is_empty()
+ || !table_partition_cols.is_empty()
+ || !order_exprs.is_empty()
+ || !constraints.is_empty()
+ || !column_defaults.is_empty();
+
+ if is_invalid {
+ return Err(Error::new(ErrorKind::FeatureUnsupported, "Currently we
only support reading existing icebergs tables in external table command. To
create new table, please use catalog provider."));
+ }
+
+ Ok(())
+}
+
+/// Complements the namespace of a table name if necessary.
+///
+/// # Note
+/// If the table name is a bare name, it will be complemented with the
'default' namespace.
+/// Otherwise, it will be returned as is. Because Iceberg tables are always
namespaced, but DataFusion
+/// external table commands maybe not include the namespace, this function
ensures that the namespace is always present.
+///
+/// # See also
+/// - [`iceberg::NamespaceIdent`]
+/// - [`datafusion::sql::planner::SqlToRel::external_table_to_plan`]
+fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_,
TableReference> {
+ match table_name {
+ TableReference::Bare { table } => {
+ Cow::Owned(TableReference::partial("default", table.as_ref()))
+ }
+ other => Cow::Borrowed(other),
+ }
+}
+
+async fn create_static_table(
+ table_name: Cow<'_, TableReference>,
+ metadata_file_path: &str,
+ props: &HashMap<String, String>,
+) -> Result<StaticTable> {
+ let table_ident = TableIdent::from_strs(table_name.to_vec())?;
+ let file_io = FileIO::from_path(metadata_file_path)?
+ .with_props(props)
+ .build()?;
+ StaticTable::from_metadata_file(metadata_file_path, table_ident,
file_io).await
+}
+
+#[cfg(test)]
+mod tests {
+
+ use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::catalog::TableProviderFactory;
+ use datafusion::common::{Constraints, DFSchema};
+ use datafusion::execution::session_state::SessionStateBuilder;
+ use datafusion::logical_expr::CreateExternalTable;
+ use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+ use datafusion::prelude::SessionContext;
+ use datafusion::sql::TableReference;
+
+ use super::*;
+
+ fn table_metadata_v2_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("x", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ Field::new("y", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ Field::new("z", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ ])
+ }
+
+ fn table_metadata_location() -> String {
+ format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV2.json"
+ )
+ }
+
+ fn create_external_table_cmd() -> CreateExternalTable {
+ let metadata_file_path = table_metadata_location();
+
+ CreateExternalTable {
+ name: TableReference::partial("static_ns", "static_table"),
+ location: metadata_file_path,
+ schema: Arc::new(DFSchema::empty()),
+ file_type: "iceberg".to_string(),
+ options: Default::default(),
+ table_partition_cols: Default::default(),
+ order_exprs: Default::default(),
+ constraints: Constraints::empty(),
+ column_defaults: Default::default(),
+ if_not_exists: Default::default(),
+ definition: Default::default(),
+ unbounded: Default::default(),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_schema_of_created_table() {
+ let factory = IcebergTableProviderFactory::new();
+
+ let state = SessionStateBuilder::new().build();
+ let cmd = create_external_table_cmd();
+
+ let table_provider = factory
+ .create(&state, &cmd)
+ .await
+ .expect("create table failed");
+
+ let expected_schema = table_metadata_v2_schema();
+ let actual_schema = table_provider.schema();
+
+ assert_eq!(actual_schema.as_ref(), &expected_schema);
+ }
+
+ #[tokio::test]
+ async fn test_schema_of_created_external_table_sql() {
+ let mut state =
SessionStateBuilder::new().with_default_features().build();
+ state.table_factories_mut().insert(
+ "ICEBERG".to_string(),
+ Arc::new(IcebergTableProviderFactory::new()),
+ );
+ let ctx = SessionContext::new_with_state(state);
+
+ // All external tables in DataFusion use bare names.
+ // See
https://github.com/apache/datafusion/blob/main/datafusion/sql/src/statement.rs#L1038-#L1039
+ let table_ref = TableReference::bare("static_table");
+
+ // Create the external table
+ let sql = format!(
+ "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
+ table_ref,
+ table_metadata_location()
+ );
+ let _df = ctx.sql(&sql).await.expect("create table failed");
+
+ // Get the created external table
+ let table_provider = ctx
+ .table_provider(table_ref)
+ .await
+ .expect("table not found");
+
+ // Check the schema of the created table
+ let expected_schema = table_metadata_v2_schema();
+ let actual_schema = table_provider.schema();
+
+ assert_eq!(actual_schema.as_ref(), &expected_schema);
+ }
+}
diff --git
a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json
b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json
new file mode 100644
index 00000000..a7b47217
--- /dev/null
+++
b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2.json
@@ -0,0 +1,121 @@
+{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 34,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 3,
+ "current-schema-id": 1,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ },
+ {
+ "type": "struct",
+ "schema-id": 1,
+ "identifier-field-ids": [
+ 1,
+ 2
+ ],
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ }
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [
+ {
+ "spec-id": 0,
+ "fields": [
+ {
+ "name": "x",
+ "transform": "identity",
+ "source-id": 1,
+ "field-id": 1000
+ }
+ ]
+ }
+ ],
+ "last-partition-id": 1000,
+ "default-sort-order-id": 3,
+ "sort-orders": [
+ {
+ "order-id": 3,
+ "fields": [
+ {
+ "transform": "identity",
+ "source-id": 2,
+ "direction": "asc",
+ "null-order": "nulls-first"
+ },
+ {
+ "transform": "bucket[4]",
+ "source-id": 3,
+ "direction": "desc",
+ "null-order": "nulls-last"
+ }
+ ]
+ }
+ ],
+ "properties": {},
+ "current-snapshot-id": 3055729675574597004,
+ "snapshots": [
+ {
+ "snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1515100955770,
+ "sequence-number": 0,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://a/b/1.avro"
+ },
+ {
+ "snapshot-id": 3055729675574597004,
+ "parent-snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1555100955770,
+ "sequence-number": 1,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://a/b/2.avro",
+ "schema-id": 1
+ }
+ ],
+ "snapshot-log": [
+ {
+ "snapshot-id": 3051729675574597004,
+ "timestamp-ms": 1515100955770
+ },
+ {
+ "snapshot-id": 3055729675574597004,
+ "timestamp-ms": 1555100955770
+ }
+ ],
+ "metadata-log": []
+}
\ No newline at end of file