jorgecarleitao commented on a change in pull request #8709:
URL: https://github.com/apache/arrow/pull/8709#discussion_r527061776



##########
File path: rust/datafusion/src/physical_plan/hash_utils.rs
##########
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Functionality used both on logical and physical plans
+
+use crate::error::{DataFusionError, Result};
+use arrow::datatypes::{Field, Schema};
+use std::collections::HashSet;
+
+/// All valid types of joins.
+#[derive(Clone, Debug)]
+pub enum JoinType {
+    /// Inner join
+    Inner,
+}
+
+/// Checks whether the schemas "left" and "right" and columns "on" represent a 
valid join.
+/// They are valid whenever their columns' intersection equals the set `on`
+pub fn check_join_is_valid(
+    left: &Schema,
+    right: &Schema,
+    on: &HashSet<String>,
+) -> Result<()> {
+    let left: HashSet<String> = left.fields().iter().map(|f| 
f.name().clone()).collect();
+    let right: HashSet<String> =
+        right.fields().iter().map(|f| f.name().clone()).collect();
+
+    check_join_set_is_valid(&left, &right, &on)
+}
+
+/// Checks whether the sets left, right and on compose a valid join.
+/// They are valid whenever their intersection equals the set `on`
+fn check_join_set_is_valid(
+    left: &HashSet<String>,
+    right: &HashSet<String>,
+    on: &HashSet<String>,
+) -> Result<()> {
+    if on.len() == 0 {
+        return Err(DataFusionError::Plan(
+            "The 'on' clause of a join cannot be empty".to_string(),
+        ));
+    }
+
+    let on_columns = on.iter().collect::<HashSet<_>>();
+    let common_columns = left.intersection(&right).collect::<HashSet<_>>();
+    let missing = on_columns
+        .difference(&common_columns)
+        .collect::<HashSet<_>>();
+    if missing.len() > 0 {
+        return Err(DataFusionError::Plan(format!(
+                "The left or right side of the join does not have columns {:?} 
columns on \"on\": \nLeft: {:?}\nRight: {:?}\nOn: {:?}",
+                missing,
+                left,
+                right,
+                on,
+            ).to_string()));
+    };
+    Ok(())
+}
+
+/// Creates a schema for a join operation.
+/// The fields "on" from the left side are always first
+pub fn build_join_schema(
+    left: &Schema,
+    right: &Schema,
+    on: &HashSet<String>,
+    join_type: &JoinType,
+) -> Schema {
+    let fields: Vec<Field> = match join_type {
+        JoinType::Inner => {
+            // inner: all fields are there
+
+            let on_fields = left.fields().iter().filter(|f| 
on.contains(f.name()));
+
+            let left_fields = left.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+            let right_fields = right.fields().iter().filter(|f| 
!on.contains(f.name()));
+
+            // "on" are first by construction, then left, then right
+            on_fields
+                .chain(left_fields)
+                .chain(right_fields)
+                .map(|f| f.clone())
+                .collect()

Review comment:
       > For example if we join table 'a' with fields id and name with table 
'b' that also has fields id and name, the output schema would be a.id, a.name, 
b.id, b.name.
   
   In general the nodes are not named, which means that `'a'` is arbitrary. 
Spark DataFrames do not append the qualifiers, as nodes are usually not named 
(`df.select('a', 'b').join(df1.select('a', 'c'), on='a')` returns columns 
`'a','b','c'`). 
   
   What I dislike in Spark is that it allows two columns to have the same name, 
i.e. `df.select('a', 'b').join(df1.select('a', 'b'), on='a')` returns columns 
`'a','b','b'`, but if we then write `.select('b')`, it raises.
   
   The current implementation uses what I consider an ideal implementation: no 
qualifiers are added, but it raises during the planning of the join if two 
columns will end up with the same name (as we do for other nodes). I.e. this 
would be possible:
   
   ```rust
   df
       .select('a', 'b')
       .join(df1, on=col("a", 0) == col("a", 1))
       .select(vec![col('a')])
   ```
   
   If we introduce qualifiers, we need to use something like
   
   ```rust
   df
       .select('a', 'b')
       .join(df1, on=on_col("a", 0) == on_col("a", 1))
       .select(vec![col('?.b')])
   ```
   
   where `?` is an arbitrary qualifier (`1?`, `t1`? `a`?). Moreover, that 
qualifier will depend on whether `df1` has a column named `b` or not, which IMO 
leads to surprising results (the resulting schema may be `?.b` or `b` depending 
on whether the nodes have colliding column names).
   
   The current code implements the principle of least surprise together with 
the "explicit is better than implicit": columns are named according to what the 
user set them to be named, and it is the user's responsibility to ensure that 
two column names do not collide (beside the `on`) in a join (i.e. DataFusion 
errors with a nice message).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to