vbarua commented on code in PR #12800:
URL: https://github.com/apache/datafusion/pull/12800#discussion_r1794387461
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -984,30 +984,64 @@ pub async fn from_substrait_rel(
/// 1. All fields present in the Substrait schema are present in the
DataFusion schema. The
/// DataFusion schema may have MORE fields, but not the other way around.
/// 2. All fields are compatible. See [`ensure_field_compatability`] for
details
-///
-/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
-/// Substrait schema is a subset of the DataFusion schema.
fn ensure_schema_compatability(
- table: DataFrame,
+ table_schema: DFSchema,
substrait_schema: DFSchema,
-) -> Result<DataFrame> {
- let df_schema = table.schema().to_owned().strip_qualifiers();
- if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
- return Ok(table);
- }
- let selected_columns = substrait_schema
+) -> Result<()> {
+ substrait_schema
.strip_qualifiers()
.fields()
.iter()
.map(|substrait_field| {
let df_field =
- df_schema.field_with_unqualified_name(substrait_field.name())?;
- ensure_field_compatability(df_field, substrait_field)?;
- Ok(col(format!("\"{}\"", df_field.name())))
+
table_schema.field_with_unqualified_name(substrait_field.name())?;
+ ensure_field_compatability(df_field, substrait_field)
})
- .collect::<Result<_>>()?;
+ .collect::<Result<()>>()?;
+
+ Ok(())
+}
+
+/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
+/// Substrait schema is a subset of the DataFusion schema.
+fn apply_projection(table: DataFrame, substrait_schema: DFSchema) ->
Result<LogicalPlan> {
+ let df_schema = table.schema().to_owned();
- table.select(selected_columns)
+ let t = table.into_unoptimized_plan();
+
+ if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
+ return Ok(t);
+ }
+
+ match t {
+ LogicalPlan::TableScan(mut scan) => {
+ let column_indices: Vec<usize> = substrait_schema
+ .strip_qualifiers()
+ .fields()
+ .iter()
+ .map(|substrait_field| {
+ Ok(df_schema
+ .index_of_column_by_name(None,
substrait_field.name().as_str())
+ .unwrap())
+ })
+ .collect::<Result<_>>()?;
+
+ let fields = column_indices
+ .iter()
+ .map(|i| df_schema.qualified_field(*i))
+ .map(|(qualifier, field)| (qualifier.cloned(),
Arc::new(field.clone())))
+ .collect();
+
+ scan.projected_schema =
DFSchemaRef::new(DFSchema::new_with_metadata(
+ fields,
+ df_schema.metadata().clone(),
+ )?);
+ scan.projection = Some(column_indices);
+
+ Ok(LogicalPlan::TableScan(scan))
+ }
+ _ => Ok(t),
Review Comment:
Should this return an error? If Substrait and DataFusion don't agree on the
schema, and then when we go to fix it nothing happens, it's unlikely the rest
of the plan will be converted correctly because any ordinal references further
up in the Substrait plan likely won't point to the right fields anymore.
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -984,30 +984,64 @@ pub async fn from_substrait_rel(
/// 1. All fields present in the Substrait schema are present in the
DataFusion schema. The
/// DataFusion schema may have MORE fields, but not the other way around.
/// 2. All fields are compatible. See [`ensure_field_compatability`] for
details
-///
-/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
-/// Substrait schema is a subset of the DataFusion schema.
fn ensure_schema_compatability(
- table: DataFrame,
+ table_schema: DFSchema,
substrait_schema: DFSchema,
-) -> Result<DataFrame> {
- let df_schema = table.schema().to_owned().strip_qualifiers();
- if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
- return Ok(table);
- }
- let selected_columns = substrait_schema
+) -> Result<()> {
+ substrait_schema
.strip_qualifiers()
.fields()
.iter()
.map(|substrait_field| {
let df_field =
- df_schema.field_with_unqualified_name(substrait_field.name())?;
- ensure_field_compatability(df_field, substrait_field)?;
- Ok(col(format!("\"{}\"", df_field.name())))
+
table_schema.field_with_unqualified_name(substrait_field.name())?;
+ ensure_field_compatability(df_field, substrait_field)
})
- .collect::<Result<_>>()?;
+ .collect::<Result<()>>()?;
+
+ Ok(())
+}
+
+/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
+/// Substrait schema is a subset of the DataFusion schema.
+fn apply_projection(table: DataFrame, substrait_schema: DFSchema) ->
Result<LogicalPlan> {
Review Comment:
I'm finding the code flow here a little tricky to follow with regards to how
we would handle the ReadRel::filter.
1. This function is consuming a `substrait_schema` that has already had the
mask expressions applied to it.
2. This function creates the DataFusion TableScan from the `table`, which is
what we would need to attach the `ReadRel::filter` to using something like
`scan.filters = <converted filters>`
3. But to build the `<converted filters>`, we'll need access to the unmasked
`substrait_schema` in order to correctly resolve any Substrait column
references.
It might be easier to follow along with this, and make it easier to re-use,
if we changed this function to something like:
```rust
/// Given the table produced by ReadRel::read_type, apply the remaining
ReadRel fields
fn convert_read_rel(table: DataFrame, read_rel: &ReadRel) ->
Result<LogicalPlan> {
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for ReadRel")
})?;
let substrait_schema =
from_substrait_named_struct(named_struct, extensions)?
.replace_qualifier(table_reference);
ensure_schema_compatability(
table.schema().to_owned(),
substrait_schema.clone(),
)?;
let t = table.into_unoptimized_plan();
// Apply filter (for later)
// Apply best-effort filter (for later)
// Apply masking (already done)
// Apply RelCommon::emit (for later, and yes a ReadRel can have both
masking AND a emit remapping)
todo!()
}
```
Effectively the responsibility of the caller is to produce the underlying
table, and then this function applies the rest of the ReadRel properties which
are common to all of the different read_types.
All that being said, this might be more scope than you intended to take and
might be better as a follow-up. I'll leave it up to you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]