alamb commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664735397



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == 
&col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between 
relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == 
name,
+                // field to lookup is qualified but current field is 
unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       it probably doesn't matter but you could avoid the Vec allocation by 
something like:
   
   ```rust
           let matches = self....; 
           match matches.next() {
           let name = matches.next() {
             None => // error about no field
             Some(name) { 
               if matches.next().is_some() => // error about ambiguous reference
               else name
             }
   ```

##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -901,20 +979,61 @@ mod tests {
             format!("{:?}", plan),
             "\
             Filter: #test.a LtEq Int64(1)\
-            \n  Join: #test.a = #test.a\
+            \n  Join: #test.a = #test2.a\

Review comment:
       👍 

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, 
DataFusionError> {

Review comment:
       FWIW this function feels like it might better belong in some sort of 
utils rather than a method on `LogicalPlan` -- perhaps 
https://github.com/houqp/arrow-datafusion/blob/qp_join/datafusion/src/optimizer/utils.rs#L50
 

##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -232,6 +241,38 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut 
Vec<&'a Expr>) {
     }
 }
 
+fn optimize_join(

Review comment:
       Nice

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -1996,16 +1992,16 @@ mod tests {
 
         let (columns, batches) = join_collect(left, right, on, 
&JoinType::Right).await?;
 
-        assert_eq!(columns, vec!["a1", "c1", "a2", "b1", "c2"]);
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
 
         let expected = vec![
-            "+----+----+----+----+----+",
-            "| a1 | c1 | a2 | b1 | c2 |",
-            "+----+----+----+----+----+",
-            "|    |    | 30 | 6  | 90 |",
-            "| 1  | 7  | 10 | 4  | 70 |",
-            "| 2  | 8  | 20 | 5  | 80 |",
-            "+----+----+----+----+----+",
+            "+----+----+----+----+----+----+",

Review comment:
       these changes make sense to me

##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) 
-> Expr {
     }
 }
 
+/// Recursively replace all Column expressions in a given expression tree with 
Column expressions
+/// provided by the hash map argument.
+pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> 
Result<Expr> {
+    struct ColumnReplacer<'a> {
+        replace_map: &'a HashMap<&'a Column, &'a Column>,
+    }
+
+    impl<'a> ExprRewriter for ColumnReplacer<'a> {
+        fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+            if let Expr::Column(c) = &expr {
+                match self.replace_map.get(c) {
+                    Some(new_c) => Ok(Expr::Column((*new_c).to_owned())),
+                    None => Ok(expr),
+                }
+            } else {
+                Ok(expr)
+            }
+        }
+    }
+
+    e.rewrite(&mut ColumnReplacer { replace_map })
+}
+
 /// Recursively call [`Column::normalize`] on all Column expressions
 /// in the `expr` expression tree.
-pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> {
-    struct ColumnNormalizer<'a, 'b> {
-        schemas: &'a [&'b DFSchemaRef],
+pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {

Review comment:
       Note I wrote some tests that will need to be adjusted in 
https://github.com/apache/arrow-datafusion/pull/689 but that is no big deal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to