This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 97f7491ed6 refactor(substrait): refactor ReadRel consumer (#12983)
97f7491ed6 is described below
commit 97f7491ed62ed7643b8b466237fd1ceb19a54431
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Oct 18 23:06:45 2024 +0400
refactor(substrait): refactor ReadRel consumer (#12983)
---
datafusion/substrait/src/logical_plan/consumer.rs | 181 +++++++++++-----------
1 file changed, 87 insertions(+), 94 deletions(-)
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 4af02858e6..08e54166d3 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -794,60 +794,61 @@ pub async fn from_substrait_rel(
let (left, right) = requalify_sides_if_needed(left, right)?;
left.cross_join(right.build()?)?.build()
}
- Some(RelType::Read(read)) => match &read.as_ref().read_type {
- Some(ReadType::NamedTable(nt)) => {
- let named_struct = read.base_schema.as_ref().ok_or_else(|| {
- substrait_datafusion_err!("No base schema provided for
Named Table")
- })?;
+ Some(RelType::Read(read)) => {
+ fn read_with_schema(
+ df: DataFrame,
+ schema: DFSchema,
+ projection: &Option<MaskExpression>,
+ ) -> Result<LogicalPlan> {
+ ensure_schema_compatability(df.schema().to_owned(),
schema.clone())?;
- let table_reference = match nt.names.len() {
- 0 => {
- return plan_err!("No table name found in NamedTable");
- }
- 1 => TableReference::Bare {
- table: nt.names[0].clone().into(),
- },
- 2 => TableReference::Partial {
- schema: nt.names[0].clone().into(),
- table: nt.names[1].clone().into(),
- },
- _ => TableReference::Full {
- catalog: nt.names[0].clone().into(),
- schema: nt.names[1].clone().into(),
- table: nt.names[2].clone().into(),
- },
- };
+ let schema = apply_masking(schema, projection)?;
- let t = ctx.table(table_reference.clone()).await?;
+ apply_projection(df, schema)
+ }
- let substrait_schema =
- from_substrait_named_struct(named_struct, extensions)?
- .replace_qualifier(table_reference);
+ let named_struct = read.base_schema.as_ref().ok_or_else(|| {
+ substrait_datafusion_err!("No base schema provided for Read
Relation")
+ })?;
- ensure_schema_compatability(
- t.schema().to_owned(),
- substrait_schema.clone(),
- )?;
+ let substrait_schema = from_substrait_named_struct(named_struct,
extensions)?;
- let substrait_schema = apply_masking(substrait_schema,
&read.projection)?;
+ match &read.as_ref().read_type {
+ Some(ReadType::NamedTable(nt)) => {
+ let table_reference = match nt.names.len() {
+ 0 => {
+ return plan_err!("No table name found in
NamedTable");
+ }
+ 1 => TableReference::Bare {
+ table: nt.names[0].clone().into(),
+ },
+ 2 => TableReference::Partial {
+ schema: nt.names[0].clone().into(),
+ table: nt.names[1].clone().into(),
+ },
+ _ => TableReference::Full {
+ catalog: nt.names[0].clone().into(),
+ schema: nt.names[1].clone().into(),
+ table: nt.names[2].clone().into(),
+ },
+ };
- apply_projection(t, substrait_schema)
- }
- Some(ReadType::VirtualTable(vt)) => {
- let base_schema = read.base_schema.as_ref().ok_or_else(|| {
- substrait_datafusion_err!("No base schema provided for
Virtual Table")
- })?;
+ let t = ctx.table(table_reference.clone()).await?;
- let schema = from_substrait_named_struct(base_schema,
extensions)?;
+ let substrait_schema =
+ substrait_schema.replace_qualifier(table_reference);
- if vt.values.is_empty() {
- return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema: DFSchemaRef::new(schema),
- }));
+ read_with_schema(t, substrait_schema, &read.projection)
}
+ Some(ReadType::VirtualTable(vt)) => {
+ if vt.values.is_empty() {
+ return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema: DFSchemaRef::new(substrait_schema),
+ }));
+ }
- let values = vt
+ let values = vt
.values
.iter()
.map(|row| {
@@ -860,79 +861,71 @@ pub async fn from_substrait_rel(
Ok(Expr::Literal(from_substrait_literal(
lit,
extensions,
- &base_schema.names,
+ &named_struct.names,
&mut name_idx,
)?))
})
.collect::<Result<_>>()?;
- if name_idx != base_schema.names.len() {
+ if name_idx != named_struct.names.len() {
return substrait_err!(
"Names list must match exactly to nested
schema, but found {} uses for {} names",
name_idx,
- base_schema.names.len()
+ named_struct.names.len()
);
}
Ok(lits)
})
.collect::<Result<_>>()?;
- Ok(LogicalPlan::Values(Values {
- schema: DFSchemaRef::new(schema),
- values,
- }))
- }
- Some(ReadType::LocalFiles(lf)) => {
- let named_struct = read.base_schema.as_ref().ok_or_else(|| {
- substrait_datafusion_err!("No base schema provided for
LocalFiles")
- })?;
-
- fn extract_filename(name: &str) -> Option<String> {
- let corrected_url =
- if name.starts_with("file://") &&
!name.starts_with("file:///") {
+ Ok(LogicalPlan::Values(Values {
+ schema: DFSchemaRef::new(substrait_schema),
+ values,
+ }))
+ }
+ Some(ReadType::LocalFiles(lf)) => {
+ fn extract_filename(name: &str) -> Option<String> {
+ let corrected_url = if name.starts_with("file://")
+ && !name.starts_with("file:///")
+ {
name.replacen("file://", "file:///", 1)
} else {
name.to_string()
};
- Url::parse(&corrected_url).ok().and_then(|url| {
- let path = url.path();
- std::path::Path::new(path)
- .file_name()
- .map(|filename|
filename.to_string_lossy().to_string())
- })
- }
-
- // we could use the file name to check the original table
provider
- // TODO: currently does not support multiple local files
- let filename: Option<String> =
- lf.items.first().and_then(|x| match x.path_type.as_ref() {
- Some(UriFile(name)) => extract_filename(name),
- _ => None,
- });
-
- if lf.items.len() > 1 || filename.is_none() {
- return not_impl_err!("Only single file reads are
supported");
- }
- let name = filename.unwrap();
- // directly use unwrap here since we could determine it is a
valid one
- let table_reference = TableReference::Bare { table:
name.into() };
- let t = ctx.table(table_reference.clone()).await?;
+ Url::parse(&corrected_url).ok().and_then(|url| {
+ let path = url.path();
+ std::path::Path::new(path)
+ .file_name()
+ .map(|filename|
filename.to_string_lossy().to_string())
+ })
+ }
- let substrait_schema =
- from_substrait_named_struct(named_struct, extensions)?
- .replace_qualifier(table_reference);
+ // we could use the file name to check the original table
provider
+ // TODO: currently does not support multiple local files
+ let filename: Option<String> =
+ lf.items.first().and_then(|x| match
x.path_type.as_ref() {
+ Some(UriFile(name)) => extract_filename(name),
+ _ => None,
+ });
- ensure_schema_compatability(
- t.schema().to_owned(),
- substrait_schema.clone(),
- )?;
+ if lf.items.len() > 1 || filename.is_none() {
+ return not_impl_err!("Only single file reads are
supported");
+ }
+ let name = filename.unwrap();
+ // directly use unwrap here since we could determine it is
a valid one
+ let table_reference = TableReference::Bare { table:
name.into() };
+ let t = ctx.table(table_reference.clone()).await?;
- let substrait_schema = apply_masking(substrait_schema,
&read.projection)?;
+ let substrait_schema =
+ substrait_schema.replace_qualifier(table_reference);
- apply_projection(t, substrait_schema)
+ read_with_schema(t, substrait_schema, &read.projection)
+ }
+ _ => {
+ not_impl_err!("Unsupported ReadType: {:?}",
&read.as_ref().read_type)
+ }
}
- _ => not_impl_err!("Unsupported ReadType: {:?}",
&read.as_ref().read_type),
- },
+ }
Some(RelType::Set(set)) => match set_rel::SetOp::try_from(set.op) {
Ok(set_op) => match set_op {
set_rel::SetOp::UnionAll => {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]