This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new abb3e63 feat: implement datafusion `TableProviderFactory` (#162)
abb3e63 is described below
commit abb3e635a28af229ba1e872fd6005c381e2b7ded
Author: kazdy <[email protected]>
AuthorDate: Mon Oct 14 23:33:30 2024 +0200
feat: implement datafusion `TableProviderFactory` (#162)
This allows datafusion users to register an external hudi table and query
it like so:
```sql
CREATE EXTERNAL TABLE trips STORED AS HUDI LOCATION 'path';
SELECT * FROM trips;
```
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/datafusion/src/lib.rs | 168 ++++++++++++++++++++++++++++++++++++-------
1 file changed, 141 insertions(+), 27 deletions(-)
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index b090696..9ef4070 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -18,13 +18,14 @@
*/
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
-use datafusion::catalog::Session;
+use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -35,7 +36,7 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::DFSchema;
use datafusion_common::DataFusionError::Execution;
use datafusion_common::Result;
-use datafusion_expr::{Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::create_physical_expr;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
@@ -150,14 +151,63 @@ impl TableProvider for HudiDataSource {
}
}
+pub struct HudiTableFactory {}
+
+impl Default for HudiTableFactory {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl HudiTableFactory {
+ pub fn new() -> Self {
+ Self {}
+ }
+
+ fn resolve_options(
+ state: &dyn Session,
+ cmd: &CreateExternalTable,
+ ) -> Result<HashMap<String, String>> {
+ let mut options: HashMap<_, _> = state
+ .config_options()
+ .entries()
+ .iter()
+ .filter_map(|e| {
+ let value = e.value.as_ref().filter(|v| !v.is_empty())?;
+ Some((e.key.clone(), value.clone()))
+ })
+ .collect();
+
+ // options from the command take precedence
+ options.extend(cmd.options.iter().map(|(k, v)| (k.clone(),
v.clone())));
+
+ Ok(options)
+ }
+}
+
+#[async_trait]
+impl TableProviderFactory for HudiTableFactory {
+ async fn create(
+ &self,
+ state: &dyn Session,
+ cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let options = HudiTableFactory::resolve_options(state, cmd)?;
+ let base_uri = cmd.location.as_str();
+ let table_provider = HudiDataSource::new_with_options(base_uri,
options).await?;
+ Ok(Arc::new(table_provider))
+ }
+}
+
#[cfg(test)]
mod tests {
+ use super::*;
+ use datafusion::execution::session_state::SessionStateBuilder;
+ use datafusion::prelude::{SessionConfig, SessionContext};
+ use datafusion_common::{DataFusionError, ScalarValue};
use std::fs::canonicalize;
use std::path::Path;
use std::sync::Arc;
-
- use datafusion::prelude::{SessionConfig, SessionContext};
- use datafusion_common::ScalarValue;
use url::Url;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
@@ -170,6 +220,7 @@ mod tests {
use utils::{get_bool_column, get_i32_column, get_str_column};
use crate::HudiDataSource;
+ use crate::HudiTableFactory;
#[tokio::test]
async fn get_default_input_partitions() {
@@ -180,22 +231,81 @@ mod tests {
assert_eq!(hudi.get_input_partitions(), 0)
}
- async fn prepare_session_context(
+ async fn register_test_table_with_session<I, K, V>(
test_table: &TestTable,
- options: Vec<(&str, &str)>,
- ) -> SessionContext {
+ options: I,
+ use_sql: bool,
+ ) -> Result<SessionContext, DataFusionError>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let ctx = create_test_session().await;
+ if use_sql {
+ let create_table_sql = format!(
+ "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
+ test_table.as_ref(),
+ test_table.path(),
+ concat_as_sql_options(options)
+ );
+ ctx.sql(create_table_sql.as_str()).await?;
+ } else {
+ let base_url = test_table.url();
+ let hudi = HudiDataSource::new_with_options(base_url.as_str(),
options).await?;
+ ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
+ }
+ Ok(ctx)
+ }
+
+ async fn create_test_session() -> SessionContext {
let config = SessionConfig::new().set(
"datafusion.sql_parser.enable_ident_normalization",
&ScalarValue::from(false),
);
- let ctx = SessionContext::new_with_config(config);
- let base_url = test_table.url();
- let hudi = HudiDataSource::new_with_options(base_url.as_str(), options)
- .await
- .unwrap();
- ctx.register_table(test_table.as_ref(), Arc::new(hudi))
- .unwrap();
- ctx
+ let mut session_state = SessionStateBuilder::new()
+ .with_default_features()
+ .with_config(config)
+ .build();
+ session_state
+ .table_factories_mut()
+ .insert("HUDI".to_string(), Arc::new(HudiTableFactory::new()));
+
+ SessionContext::new_with_state(session_state)
+ }
+
+ fn concat_as_sql_options<I, K, V>(options: I) -> String
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let kv_pairs: Vec<String> = options
+ .into_iter()
+ .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
+ .collect();
+
+ if kv_pairs.is_empty() {
+ String::new()
+ } else {
+ format!("OPTIONS ({})", kv_pairs.join(", "))
+ }
+ }
+
+ #[tokio::test]
+ async fn test_create_table_with_unknown_format() {
+ let test_table = V6Nonpartitioned;
+ let invalid_format = "UNKNOWN_FORMAT";
+ let create_table_sql = format!(
+ "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
+ test_table.as_ref(),
+ invalid_format,
+ test_table.path()
+ );
+
+ let ctx = create_test_session().await;
+ let result = ctx.sql(create_table_sql.as_str()).await;
+ assert!(result.is_err());
}
async fn verify_plan(
@@ -236,16 +346,18 @@ mod tests {
#[tokio::test]
async fn datafusion_read_hudi_table() {
- for (test_table, planned_input_partitions) in &[
- (V6ComplexkeygenHivestyle, 2),
- (V6Nonpartitioned, 1),
- (V6SimplekeygenNonhivestyle, 2),
- (V6SimplekeygenHivestyleNoMetafields, 2),
- (V6TimebasedkeygenNonhivestyle, 2),
+ for (test_table, use_sql, planned_input_partitions) in &[
+ (V6ComplexkeygenHivestyle, true, 2),
+ (V6Nonpartitioned, true, 1),
+ (V6SimplekeygenNonhivestyle, false, 2),
+ (V6SimplekeygenHivestyleNoMetafields, true, 2),
+ (V6TimebasedkeygenNonhivestyle, false, 2),
] {
println!(">>> testing for {}", test_table.as_ref());
- let options = vec![(InputPartitions.as_ref(), "2")];
- let ctx = prepare_session_context(test_table, options).await;
+ let options = [(InputPartitions, "2")];
+ let ctx = register_test_table_with_session(test_table, options,
*use_sql)
+ .await
+ .unwrap();
let sql = format!(
r#"
@@ -275,12 +387,14 @@ mod tests {
#[tokio::test]
async fn datafusion_read_hudi_table_with_replacecommits() {
- for (test_table, planned_input_partitions) in
- &[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
+ for (test_table, use_sql, planned_input_partitions) in
+ &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
{
println!(">>> testing for {}", test_table.as_ref());
let ctx =
- prepare_session_context(test_table,
vec![(InputPartitions.as_ref(), "2")]).await;
+ register_test_table_with_session(test_table,
[(InputPartitions, "2")], *use_sql)
+ .await
+ .unwrap();
let sql = format!(
r#"