tokoko commented on code in PR #12800:
URL: https://github.com/apache/datafusion/pull/12800#discussion_r1794571803
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -984,30 +984,64 @@ pub async fn from_substrait_rel(
/// 1. All fields present in the Substrait schema are present in the
DataFusion schema. The
/// DataFusion schema may have MORE fields, but not the other way around.
/// 2. All fields are compatible. See [`ensure_field_compatability`] for
details
-///
-/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
-/// Substrait schema is a subset of the DataFusion schema.
fn ensure_schema_compatability(
- table: DataFrame,
+ table_schema: DFSchema,
substrait_schema: DFSchema,
-) -> Result<DataFrame> {
- let df_schema = table.schema().to_owned().strip_qualifiers();
- if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
- return Ok(table);
- }
- let selected_columns = substrait_schema
+) -> Result<()> {
+ substrait_schema
.strip_qualifiers()
.fields()
.iter()
.map(|substrait_field| {
let df_field =
- df_schema.field_with_unqualified_name(substrait_field.name())?;
- ensure_field_compatability(df_field, substrait_field)?;
- Ok(col(format!("\"{}\"", df_field.name())))
+
table_schema.field_with_unqualified_name(substrait_field.name())?;
+ ensure_field_compatability(df_field, substrait_field)
})
- .collect::<Result<_>>()?;
+ .collect::<Result<()>>()?;
+
+ Ok(())
+}
+
+/// This function returns a DataFrame with fields adjusted if necessary in the
event that the
+/// Substrait schema is a subset of the DataFusion schema.
+fn apply_projection(table: DataFrame, substrait_schema: DFSchema) ->
Result<LogicalPlan> {
+ let df_schema = table.schema().to_owned();
- table.select(selected_columns)
+ let t = table.into_unoptimized_plan();
+
+ if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
+ return Ok(t);
+ }
+
+ match t {
+ LogicalPlan::TableScan(mut scan) => {
+ let column_indices: Vec<usize> = substrait_schema
+ .strip_qualifiers()
+ .fields()
+ .iter()
+ .map(|substrait_field| {
+ Ok(df_schema
+ .index_of_column_by_name(None,
substrait_field.name().as_str())
+ .unwrap())
+ })
+ .collect::<Result<_>>()?;
+
+ let fields = column_indices
+ .iter()
+ .map(|i| df_schema.qualified_field(*i))
+ .map(|(qualifier, field)| (qualifier.cloned(),
Arc::new(field.clone())))
+ .collect();
+
+ scan.projected_schema =
DFSchemaRef::new(DFSchema::new_with_metadata(
+ fields,
+ df_schema.metadata().clone(),
+ )?);
+ scan.projection = Some(column_indices);
+
+ Ok(LogicalPlan::TableScan(scan))
+ }
+ _ => Ok(t),
Review Comment:
This only ever gets called with a df with a TableScan node so hopefully that
default case should never be reached. I'll throw an error indicating it was
called with something else.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]