liurenjie1024 commented on code in PR #1972:
URL: https://github.com/apache/iceberg-rust/pull/1972#discussion_r2680683700
##########
crates/integrations/datafusion/src/schema.rs:
##########
@@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider {
Ok(self
.tables
.get(name)
- .cloned()
- .map(|t| t as Arc<dyn TableProvider>))
+ .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ // Convert DataFusion schema to Iceberg schema
+ // DataFusion schemas don't have field IDs, so we use the function
that assigns them automatically
+ let df_schema = table.schema();
+ let iceberg_schema =
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+ .map_err(to_datafusion_error)?;
+
+ // Create the table in the Iceberg catalog
+ let table_creation = TableCreation::builder()
+ .name(name.clone())
+ .schema(iceberg_schema)
Review Comment:
We ignored the data in create table statement, I think it's fine for now.
But we should throw error when it exists.
##########
crates/integrations/datafusion/src/schema.rs:
##########
@@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider {
Ok(self
.tables
.get(name)
- .cloned()
- .map(|t| t as Arc<dyn TableProvider>))
+ .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ // Convert DataFusion schema to Iceberg schema
+ // DataFusion schemas don't have field IDs, so we use the function
that assigns them automatically
+ let df_schema = table.schema();
+ let iceberg_schema =
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+ .map_err(to_datafusion_error)?;
+
+ // Create the table in the Iceberg catalog
+ let table_creation = TableCreation::builder()
+ .name(name.clone())
+ .schema(iceberg_schema)
+ .build();
+
+ let catalog = self.catalog.clone();
+ let namespace = self.namespace.clone();
+ let tables = self.tables.clone();
+ let name_clone = name.clone();
+
+ // Use tokio's spawn_blocking to handle the async work on a blocking
thread pool
+ let result = tokio::task::spawn_blocking(move || {
Review Comment:
Why we should first spawn_blocking, then block on? I think we just need
follwoing:
```
let rt = Handle::current();
rt.block_on()?
```
##########
crates/integrations/datafusion/src/schema.rs:
##########
@@ -127,7 +139,61 @@ impl SchemaProvider for IcebergSchemaProvider {
Ok(self
.tables
.get(name)
- .cloned()
- .map(|t| t as Arc<dyn TableProvider>))
+ .map(|entry| entry.value().clone() as Arc<dyn TableProvider>))
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> DFResult<Option<Arc<dyn TableProvider>>> {
+ // Convert DataFusion schema to Iceberg schema
+ // DataFusion schemas don't have field IDs, so we use the function
that assigns them automatically
+ let df_schema = table.schema();
+ let iceberg_schema =
arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
+ .map_err(to_datafusion_error)?;
+
+ // Create the table in the Iceberg catalog
+ let table_creation = TableCreation::builder()
+ .name(name.clone())
+ .schema(iceberg_schema)
+ .build();
+
+ let catalog = self.catalog.clone();
+ let namespace = self.namespace.clone();
+ let tables = self.tables.clone();
+ let name_clone = name.clone();
+
+ // Use tokio's spawn_blocking to handle the async work on a blocking
thread pool
+ let result = tokio::task::spawn_blocking(move || {
+ // Create a new runtime handle to execute the async work
+ let rt = tokio::runtime::Handle::current();
+ rt.block_on(async move {
+ catalog
+ .create_table(&namespace, table_creation)
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Create a new table provider using the catalog reference
+ let table_provider = IcebergTableProvider::try_new(
+ catalog.clone(),
+ namespace.clone(),
+ name_clone.clone(),
+ )
+ .await
+ .map_err(to_datafusion_error)?;
+
+ // Store the new table provider
+ let old_table = tables.insert(name_clone,
Arc::new(table_provider));
Review Comment:
According to the doc, we should return error when table exists:
https://github.com/apache/datafusion/blob/f9697c14e29babc961c074eaec008e747495a636/datafusion/catalog/src/schema.rs#L72
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]