tokoko commented on code in PR #12800:
URL: https://github.com/apache/datafusion/pull/12800#discussion_r1796173482


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -984,30 +984,64 @@ pub async fn from_substrait_rel(
 /// 1. All fields present in the Substrait schema are present in the 
DataFusion schema. The
 ///    DataFusion schema may have MORE fields, but not the other way around.
 /// 2. All fields are compatible. See [`ensure_field_compatability`] for 
details
-///
-/// This function returns a DataFrame with fields adjusted if necessary in the 
event that the
-/// Substrait schema is a subset of the DataFusion schema.
 fn ensure_schema_compatability(
-    table: DataFrame,
+    table_schema: DFSchema,
     substrait_schema: DFSchema,
-) -> Result<DataFrame> {
-    let df_schema = table.schema().to_owned().strip_qualifiers();
-    if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
-        return Ok(table);
-    }
-    let selected_columns = substrait_schema
+) -> Result<()> {
+    substrait_schema
         .strip_qualifiers()
         .fields()
         .iter()
         .map(|substrait_field| {
             let df_field =
-                df_schema.field_with_unqualified_name(substrait_field.name())?;
-            ensure_field_compatability(df_field, substrait_field)?;
-            Ok(col(format!("\"{}\"", df_field.name())))
+                
table_schema.field_with_unqualified_name(substrait_field.name())?;
+            ensure_field_compatability(df_field, substrait_field)
         })
-        .collect::<Result<_>>()?;
+        .collect::<Result<()>>()?;
+
+    Ok(())
+}
+
+/// This function returns a DataFrame with fields adjusted if necessary in the 
event that the
+/// Substrait schema is a subset of the DataFusion schema.
+fn apply_projection(table: DataFrame, substrait_schema: DFSchema) -> 
Result<LogicalPlan> {
+    let df_schema = table.schema().to_owned();
 
-    table.select(selected_columns)
+    let t = table.into_unoptimized_plan();
+
+    if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
+        return Ok(t);
+    }
+
+    match t {
+        LogicalPlan::TableScan(mut scan) => {
+            let column_indices: Vec<usize> = substrait_schema
+                .strip_qualifiers()
+                .fields()
+                .iter()
+                .map(|substrait_field| {
+                    Ok(df_schema
+                        .index_of_column_by_name(None, 
substrait_field.name().as_str())
+                        .unwrap())
+                })
+                .collect::<Result<_>>()?;
+
+            let fields = column_indices
+                .iter()
+                .map(|i| df_schema.qualified_field(*i))
+                .map(|(qualifier, field)| (qualifier.cloned(), 
Arc::new(field.clone())))
+                .collect();
+
+            scan.projected_schema = 
DFSchemaRef::new(DFSchema::new_with_metadata(
+                fields,
+                df_schema.metadata().clone(),
+            )?);
+            scan.projection = Some(column_indices);
+
+            Ok(LogicalPlan::TableScan(scan))
+        }
+        _ => Ok(t),

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to