This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 97f7491ed6 refactor(substrait): refactor ReadRel consumer (#12983)
97f7491ed6 is described below

commit 97f7491ed62ed7643b8b466237fd1ceb19a54431
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Oct 18 23:06:45 2024 +0400

    refactor(substrait): refactor ReadRel consumer (#12983)
---
 datafusion/substrait/src/logical_plan/consumer.rs | 181 +++++++++++-----------
 1 file changed, 87 insertions(+), 94 deletions(-)

diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 4af02858e6..08e54166d3 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -794,60 +794,61 @@ pub async fn from_substrait_rel(
             let (left, right) = requalify_sides_if_needed(left, right)?;
             left.cross_join(right.build()?)?.build()
         }
-        Some(RelType::Read(read)) => match &read.as_ref().read_type {
-            Some(ReadType::NamedTable(nt)) => {
-                let named_struct = read.base_schema.as_ref().ok_or_else(|| {
-                    substrait_datafusion_err!("No base schema provided for 
Named Table")
-                })?;
+        Some(RelType::Read(read)) => {
+            fn read_with_schema(
+                df: DataFrame,
+                schema: DFSchema,
+                projection: &Option<MaskExpression>,
+            ) -> Result<LogicalPlan> {
+                ensure_schema_compatability(df.schema().to_owned(), 
schema.clone())?;
 
-                let table_reference = match nt.names.len() {
-                    0 => {
-                        return plan_err!("No table name found in NamedTable");
-                    }
-                    1 => TableReference::Bare {
-                        table: nt.names[0].clone().into(),
-                    },
-                    2 => TableReference::Partial {
-                        schema: nt.names[0].clone().into(),
-                        table: nt.names[1].clone().into(),
-                    },
-                    _ => TableReference::Full {
-                        catalog: nt.names[0].clone().into(),
-                        schema: nt.names[1].clone().into(),
-                        table: nt.names[2].clone().into(),
-                    },
-                };
+                let schema = apply_masking(schema, projection)?;
 
-                let t = ctx.table(table_reference.clone()).await?;
+                apply_projection(df, schema)
+            }
 
-                let substrait_schema =
-                    from_substrait_named_struct(named_struct, extensions)?
-                        .replace_qualifier(table_reference);
+            let named_struct = read.base_schema.as_ref().ok_or_else(|| {
+                substrait_datafusion_err!("No base schema provided for Read 
Relation")
+            })?;
 
-                ensure_schema_compatability(
-                    t.schema().to_owned(),
-                    substrait_schema.clone(),
-                )?;
+            let substrait_schema = from_substrait_named_struct(named_struct, 
extensions)?;
 
-                let substrait_schema = apply_masking(substrait_schema, 
&read.projection)?;
+            match &read.as_ref().read_type {
+                Some(ReadType::NamedTable(nt)) => {
+                    let table_reference = match nt.names.len() {
+                        0 => {
+                            return plan_err!("No table name found in 
NamedTable");
+                        }
+                        1 => TableReference::Bare {
+                            table: nt.names[0].clone().into(),
+                        },
+                        2 => TableReference::Partial {
+                            schema: nt.names[0].clone().into(),
+                            table: nt.names[1].clone().into(),
+                        },
+                        _ => TableReference::Full {
+                            catalog: nt.names[0].clone().into(),
+                            schema: nt.names[1].clone().into(),
+                            table: nt.names[2].clone().into(),
+                        },
+                    };
 
-                apply_projection(t, substrait_schema)
-            }
-            Some(ReadType::VirtualTable(vt)) => {
-                let base_schema = read.base_schema.as_ref().ok_or_else(|| {
-                    substrait_datafusion_err!("No base schema provided for 
Virtual Table")
-                })?;
+                    let t = ctx.table(table_reference.clone()).await?;
 
-                let schema = from_substrait_named_struct(base_schema, 
extensions)?;
+                    let substrait_schema =
+                        substrait_schema.replace_qualifier(table_reference);
 
-                if vt.values.is_empty() {
-                    return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
-                        produce_one_row: false,
-                        schema: DFSchemaRef::new(schema),
-                    }));
+                    read_with_schema(t, substrait_schema, &read.projection)
                 }
+                Some(ReadType::VirtualTable(vt)) => {
+                    if vt.values.is_empty() {
+                        return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
+                            produce_one_row: false,
+                            schema: DFSchemaRef::new(substrait_schema),
+                        }));
+                    }
 
-                let values = vt
+                    let values = vt
                     .values
                     .iter()
                     .map(|row| {
@@ -860,79 +861,71 @@ pub async fn from_substrait_rel(
                                 Ok(Expr::Literal(from_substrait_literal(
                                     lit,
                                     extensions,
-                                    &base_schema.names,
+                                    &named_struct.names,
                                     &mut name_idx,
                                 )?))
                             })
                             .collect::<Result<_>>()?;
-                        if name_idx != base_schema.names.len() {
+                        if name_idx != named_struct.names.len() {
                             return substrait_err!(
                                 "Names list must match exactly to nested 
schema, but found {} uses for {} names",
                                 name_idx,
-                                base_schema.names.len()
+                                named_struct.names.len()
                             );
                         }
                         Ok(lits)
                     })
                     .collect::<Result<_>>()?;
 
-                Ok(LogicalPlan::Values(Values {
-                    schema: DFSchemaRef::new(schema),
-                    values,
-                }))
-            }
-            Some(ReadType::LocalFiles(lf)) => {
-                let named_struct = read.base_schema.as_ref().ok_or_else(|| {
-                    substrait_datafusion_err!("No base schema provided for 
LocalFiles")
-                })?;
-
-                fn extract_filename(name: &str) -> Option<String> {
-                    let corrected_url =
-                        if name.starts_with("file://") && 
!name.starts_with("file:///") {
+                    Ok(LogicalPlan::Values(Values {
+                        schema: DFSchemaRef::new(substrait_schema),
+                        values,
+                    }))
+                }
+                Some(ReadType::LocalFiles(lf)) => {
+                    fn extract_filename(name: &str) -> Option<String> {
+                        let corrected_url = if name.starts_with("file://")
+                            && !name.starts_with("file:///")
+                        {
                             name.replacen("file://", "file:///", 1)
                         } else {
                             name.to_string()
                         };
 
-                    Url::parse(&corrected_url).ok().and_then(|url| {
-                        let path = url.path();
-                        std::path::Path::new(path)
-                            .file_name()
-                            .map(|filename| 
filename.to_string_lossy().to_string())
-                    })
-                }
-
-                // we could use the file name to check the original table 
provider
-                // TODO: currently does not support multiple local files
-                let filename: Option<String> =
-                    lf.items.first().and_then(|x| match x.path_type.as_ref() {
-                        Some(UriFile(name)) => extract_filename(name),
-                        _ => None,
-                    });
-
-                if lf.items.len() > 1 || filename.is_none() {
-                    return not_impl_err!("Only single file reads are 
supported");
-                }
-                let name = filename.unwrap();
-                // directly use unwrap here since we could determine it is a 
valid one
-                let table_reference = TableReference::Bare { table: 
name.into() };
-                let t = ctx.table(table_reference.clone()).await?;
+                        Url::parse(&corrected_url).ok().and_then(|url| {
+                            let path = url.path();
+                            std::path::Path::new(path)
+                                .file_name()
+                                .map(|filename| 
filename.to_string_lossy().to_string())
+                        })
+                    }
 
-                let substrait_schema =
-                    from_substrait_named_struct(named_struct, extensions)?
-                        .replace_qualifier(table_reference);
+                    // we could use the file name to check the original table 
provider
+                    // TODO: currently does not support multiple local files
+                    let filename: Option<String> =
+                        lf.items.first().and_then(|x| match 
x.path_type.as_ref() {
+                            Some(UriFile(name)) => extract_filename(name),
+                            _ => None,
+                        });
 
-                ensure_schema_compatability(
-                    t.schema().to_owned(),
-                    substrait_schema.clone(),
-                )?;
+                    if lf.items.len() > 1 || filename.is_none() {
+                        return not_impl_err!("Only single file reads are 
supported");
+                    }
+                    let name = filename.unwrap();
+                    // directly use unwrap here since we could determine it is 
a valid one
+                    let table_reference = TableReference::Bare { table: 
name.into() };
+                    let t = ctx.table(table_reference.clone()).await?;
 
-                let substrait_schema = apply_masking(substrait_schema, 
&read.projection)?;
+                    let substrait_schema =
+                        substrait_schema.replace_qualifier(table_reference);
 
-                apply_projection(t, substrait_schema)
+                    read_with_schema(t, substrait_schema, &read.projection)
+                }
+                _ => {
+                    not_impl_err!("Unsupported ReadType: {:?}", 
&read.as_ref().read_type)
+                }
             }
-            _ => not_impl_err!("Unsupported ReadType: {:?}", 
&read.as_ref().read_type),
-        },
+        }
         Some(RelType::Set(set)) => match set_rel::SetOp::try_from(set.op) {
             Ok(set_op) => match set_op {
                 set_rel::SetOp::UnionAll => {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to