westonpace commented on code in PR #12800:
URL: https://github.com/apache/datafusion/pull/12800#discussion_r1792348343


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -340,41 +337,20 @@ pub fn extract_projection(
                     .iter()
                     .map(|item| item.field as usize)
                     .collect();
-                match t {
-                    LogicalPlan::TableScan(mut scan) => {
-                        let fields = column_indices
-                            .iter()
-                            .map(|i| scan.projected_schema.qualified_field(*i))
-                            .map(|(qualifier, field)| {
-                                (qualifier.cloned(), Arc::new(field.clone()))
-                            })
-                            .collect();
-                        scan.projection = Some(column_indices);
-                        scan.projected_schema = DFSchemaRef::new(
-                            DFSchema::new_with_metadata(fields, 
HashMap::new())?,
-                        );
-                        Ok(LogicalPlan::TableScan(scan))
-                    }
-                    LogicalPlan::Projection(projection) => {
-                        // create another Projection around the Projection to 
handle the field masking
-                        let fields: Vec<Expr> = column_indices
-                            .into_iter()
-                            .map(|i| {
-                                let (qualifier, field) =
-                                    projection.schema.qualified_field(i);
-                                let column =
-                                    Column::new(qualifier.cloned(), 
field.name());
-                                Expr::Column(column)
-                            })
-                            .collect();
-                        project(LogicalPlan::Projection(projection), fields)
-                    }
-                    _ => plan_err!("unexpected plan for table"),
-                }
+
+                let fields = column_indices
+                    .iter()
+                    .map(|i| schema.qualified_field(*i))
+                    .map(|(qualifier, field)| {
+                        (qualifier.cloned(), Arc::new(field.clone()))
+                    })
+                    .collect();
+
+                Ok(DFSchema::new_with_metadata(fields, HashMap::new())?)

Review Comment:
   Feel free to ignore, I'm still learning this code.  Would it be more future 
proof to take the metadata from `schema`?  I assume the reason `HashMap::new()` 
is ok here is because `schema` comes from Substrait and thus has no metadata?



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -990,24 +978,46 @@ pub async fn from_substrait_rel(
 fn ensure_schema_compatability(
     table: DataFrame,
     substrait_schema: DFSchema,
-) -> Result<DataFrame> {
-    let df_schema = table.schema().to_owned().strip_qualifiers();
+) -> Result<LogicalPlan> {

Review Comment:
   Feel free to ignore.  It's strange to me, as a new reader, that a method 
named `ensure_..._compatability` is not an assertion/check/validation style 
method but instead a transformation of some kind?  What is the purpose of this 
method?  It looks like it is converting a projection and a table reference into 
a table scan?



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -990,24 +978,46 @@ pub async fn from_substrait_rel(
 fn ensure_schema_compatability(
     table: DataFrame,
     substrait_schema: DFSchema,
-) -> Result<DataFrame> {
-    let df_schema = table.schema().to_owned().strip_qualifiers();
+) -> Result<LogicalPlan> {
+    let df_schema = table.schema().to_owned();
+
+    let t = table.into_unoptimized_plan();
+
     if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
-        return Ok(table);
+        return Ok(t);
     }
-    let selected_columns = substrait_schema
-        .strip_qualifiers()
-        .fields()
-        .iter()
-        .map(|substrait_field| {
-            let df_field =
-                df_schema.field_with_unqualified_name(substrait_field.name())?;
-            ensure_field_compatability(df_field, substrait_field)?;
-            Ok(col(format!("\"{}\"", df_field.name())))
-        })
-        .collect::<Result<_>>()?;
 
-    table.select(selected_columns)
+    match t {
+        LogicalPlan::TableScan(mut scan) => {
+            let column_indices: Vec<usize> = substrait_schema
+                .strip_qualifiers()
+                .fields()
+                .iter()
+                .map(|substrait_field| {
+                    let df_field =
+                        
df_schema.field_with_unqualified_name(substrait_field.name())?;
+                    ensure_field_compatability(df_field, substrait_field)?;
+
+                    Ok(df_schema
+                        .index_of_column_by_name(None, 
substrait_field.name().as_str())
+                        .unwrap())

Review Comment:
   Just checking my understanding.
   
   First, we get the schema from substrait
   Next, we do a name based lookup into some info about the table (presumably 
from catalog) to get "column-in-file" indices.
   Finally, we scan with those column-in-file indices as the projection
   
   This way, if the columns in the file are in a completely different order we 
are ok, as long as the name-based lookup matches?
   
   This sounds correct to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to